ib: don't bother with recursive not-enough-bars queries for now, causes more problems then it solves..

distribute_dis
Tyler Goodlet 2023-12-15 13:56:42 -05:00
parent 97e2403fb1
commit ba154ef413
2 changed files with 88 additions and 40 deletions

View File

@ -401,7 +401,15 @@ class Client:
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
if end_dt:
# XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
if (
end_dt
and False
):
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
first: float = times[0]
@ -410,6 +418,7 @@ class Client:
if (
# len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds()
# and False
):
end_dt: DateTime = from_timestamp(first)
log.warning(
@ -859,6 +868,9 @@ class Client:
timeout=timeout,
)
except TimeoutError:
import pdbp
pdbp.set_trace()
if raise_on_timeout:
raise
return None

View File

@ -174,8 +174,15 @@ async def open_history_client(
start_dt: datetime | None = None,
) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
if (
start_dt
and start_dt.timestamp() == 0
):
await tractor.pause()
query_start = time.time()
out, timedout = await get_bars(
proxy,
@ -403,35 +410,55 @@ async def get_bars(
bars, bars_array, dt_duration = out
# not enough bars signal, likely due to venue
# operational gaps.
too_little: bool = False
if (
end_dt
and (
not bars
or (too_little :=
start_dt
and (len(bars) * timeframe)
< dt_duration.in_seconds()
)
)
):
if (
end_dt
or too_little
):
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
)
end_dt -= dt_duration
continue
raise NoData(f'{end_dt}')
if bars_array is None:
raise SymbolNotFound(fqme)
# not enough bars signal, likely due to venue
# operational gaps.
# too_little: bool = False
if end_dt:
if not bars:
# no data returned?
log.warning(
'History frame is blank?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
)
raise NoData(f'{end_dt}')
else:
dur_s: float = len(bars) * timeframe
bars_dur = pendulum.Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
@ -854,7 +881,13 @@ async def stream_quotes(
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker = await proxy.get_quote(contract=con)
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
if first_ticker:
first_quote: dict = normalize(first_ticker)
log.info(
@ -862,18 +895,6 @@ async def stream_quotes(
f'{pformat(first_quote)}'
)
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
@ -884,6 +905,8 @@ async def stream_quotes(
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
@ -907,6 +930,19 @@ async def stream_quotes(
await trio.sleep_forever()
return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# XXX: MUST acquire a ticker + first quote before starting
# the live quotes loop!
# with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
cs: trio.CancelScope | None = None
startup: bool = True
while (