Bleh, fix ib's `Client.bars()` recursion..

Turns out this was the main source of all sorts of gaps and overlaps
in history frame backfilling. The original idea was that when a gap
causes not enough (1m) bars to be delivered (like over a weekend or
holiday) when we just implicitly do another frame query to try and at
least fill out the default duration (normally 1-2 days). Doing the
recursion sloppily was causing all sorts of stupid problems..

It's kinda obvious now what was wrong in hindsight:
- always pass the sampling period (timeframe) when recursing
- adjust the logic to not be mutex with the no-data case (since it
  already is mutex..)
- pack to the `numpy` array BEFORE the recursive call to ensure the
  `end_dt: DateTime` is selected and passed correctly!

Toss in some other helpfuls:
- more explicit `pendulum` typing imports
- some masked out sorted-diffing checks (that can be enabled when
  debugging out-of-order frame issues)
- always error log about less-than time step mismatches since we should never
  have time-diff steps **smaller** then specified in the
  `sample_period_s`!
distribute_dis
Tyler Goodlet 2023-12-12 16:19:21 -05:00
parent b03eceebef
commit 8e4d1a48ed
3 changed files with 93 additions and 34 deletions

View File

@ -49,7 +49,12 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
import pendulum from pendulum import (
from_timestamp,
DateTime,
Duration,
duration as mk_duration,
)
from eventkit import Event from eventkit import Event
from ib_insync import ( from ib_insync import (
client as ib_client, client as ib_client,
@ -221,16 +226,20 @@ def bars_to_np(bars: list) -> np.ndarray:
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd # https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
_samplings: dict[int, tuple[str, str]] = { _samplings: dict[int, tuple[str, str]] = {
1: ( 1: (
# ib strs
'1 secs', '1 secs',
f'{int(2e3)} S', f'{int(2e3)} S',
pendulum.duration(seconds=2e3),
mk_duration(seconds=2e3),
), ),
# TODO: benchmark >1 D duration on query to see if # TODO: benchmark >1 D duration on query to see if
# throughput can be made faster during backfilling. # throughput can be made faster during backfilling.
60: ( 60: (
# ib strs
'1 min', '1 min',
'2 D', '2 D',
pendulum.duration(days=2),
mk_duration(days=2),
), ),
} }
@ -315,7 +324,7 @@ class Client:
**kwargs, **kwargs,
) -> tuple[BarDataList, np.ndarray, pendulum.Duration]: ) -> tuple[BarDataList, np.ndarray, Duration]:
''' '''
Retreive OHLCV bars for a fqme over a range to the present. Retreive OHLCV bars for a fqme over a range to the present.
@ -324,11 +333,20 @@ class Client:
# https://interactivebrokers.github.io/tws-api/historical_data.html # https://interactivebrokers.github.io/tws-api/historical_data.html
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
bars_kwargs.update(kwargs) bars_kwargs.update(kwargs)
bar_size, duration, dt_duration = _samplings[sample_period_s] (
bar_size,
ib_duration_str,
default_dt_duration,
) = _samplings[sample_period_s]
dt_duration: DateTime = (
duration
or default_dt_duration
)
global _enters global _enters
log.info( log.info(
f"REQUESTING {duration}'s worth {bar_size} BARS\n" f"REQUESTING {ib_duration_str}'s worth {bar_size} BARS\n"
f'{_enters} @ end={end_dt}"' f'{_enters} @ end={end_dt}"'
) )
@ -353,7 +371,7 @@ class Client:
# time history length values format: # time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
durationStr=duration, durationStr=ib_duration_str,
# always use extended hours # always use extended hours
useRTH=False, useRTH=False,
@ -383,29 +401,55 @@ class Client:
# => we recursively call this method until we get at least # => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the # as many bars such that they sum in aggregate to the the
# desired total time (duration) at most. # desired total time (duration) at most.
elif ( if end_dt:
end_dt nparr: np.ndarray = bars_to_np(bars)
and ( times: np.ndarray = nparr['time']
(len(bars) * sample_period_s) < dt_duration.in_seconds() first: float = times[0]
) tdiff: float = times[-1] - first
):
log.warning( if (
f'Recursing to get more bars from {end_dt} for {dt_duration}' # len(bars) * sample_period_s) < dt_duration.in_seconds()
) tdiff < dt_duration.in_seconds()
end_dt -= dt_duration ):
( end_dt: DateTime = from_timestamp(first)
r_bars, log.warning(
r_arr, f'Frame result was shorter then {dt_duration}!?\n'
r_duration, 'Recursing for more bars:\n'
) = await self.bars( f'end_dt: {end_dt}\n'
fqme, f'dt_duration: {dt_duration}\n'
start_dt=start_dt, )
end_dt=end_dt, (
) r_bars,
r_bars.extend(bars) r_arr,
bars = r_bars r_duration,
) = await self.bars(
fqme,
start_dt=start_dt,
end_dt=end_dt,
sample_period_s=sample_period_s,
# TODO: make a table for Duration to
# the ib str values in order to use this?
# duration=duration,
)
r_bars.extend(bars)
bars = r_bars
nparr = bars_to_np(bars) nparr = bars_to_np(bars)
# timestep should always be at least as large as the
# period step.
tdiff: np.ndarray = np.diff(nparr['time'])
to_short: np.ndarray = tdiff < sample_period_s
if (to_short).any():
# raise ValueError(
log.error(
f'OHLC frame for {sample_period_s} has {to_short.size} '
'time steps which are shorter then expected?!"'
)
# OOF: this will break teardown?
# breakpoint()
return bars, nparr, dt_duration return bars, nparr, dt_duration
async def con_deats( async def con_deats(

View File

@ -20,7 +20,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import ExitStack from contextlib import ExitStack
from collections import ChainMap # from collections import ChainMap
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time

View File

@ -196,10 +196,8 @@ async def open_history_client(
f'mean: {mean}' f'mean: {mean}'
) )
if ( # could be trying to retreive bars over weekend
out is None if out is None:
):
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?") log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData( raise NoData(
f'{end_dt}', f'{end_dt}',
@ -213,7 +211,24 @@ async def open_history_client(
): ):
raise DataUnavailable(f'First timestamp is {head_dt}') raise DataUnavailable(f'First timestamp is {head_dt}')
bars, bars_array, first_dt, last_dt = out # also see return type for `get_bars()`
bars: ibis.objects.BarDataList
bars_array: np.ndarray
first_dt: datetime
last_dt: datetime
(
bars,
bars_array,
first_dt,
last_dt,
) = out
# TODO: audit the sampling period here as well?
# timestep should always be at least as large as the
# period step.
# tdiff: np.ndarray = np.diff(bars_array['time'])
# if (tdiff < timeframe).any():
# await tractor.pause()
# volume cleaning since there's -ve entries, # volume cleaning since there's -ve entries,
# wood luv to know what crookery that is.. # wood luv to know what crookery that is..