Raise `DataUnavailable` on >= 6 no data error events

ib_1m_hist
Tyler Goodlet 2022-10-27 10:24:15 -04:00
parent 610fb5f7c6
commit d5b357b69a
1 changed files with 34 additions and 15 deletions

View File

@ -107,7 +107,7 @@ async def open_data_client() -> MethodProxy:
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, fqsn: str,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
''' '''
@ -130,7 +130,7 @@ async def open_history_client(
mean: float = 0 mean: float = 0
count: int = 0 count: int = 0
head_dt = await proxy.get_head_time(fqsn=symbol) head_dt = await proxy.get_head_time(fqsn=fqsn)
async def get_hist( async def get_hist(
timeframe: float, timeframe: float,
@ -143,7 +143,7 @@ async def open_history_client(
query_start = time.time() query_start = time.time()
out, timedout = await get_bars( out, timedout = await get_bars(
proxy, proxy,
symbol, fqsn,
timeframe, timeframe,
end_dt=end_dt, end_dt=end_dt,
) )
@ -169,7 +169,9 @@ async def open_history_client(
# frame_size=2000, # frame_size=2000,
) )
if end_dt and end_dt <= head_dt: if (
end_dt and end_dt <= head_dt
):
raise DataUnavailable(f'First timestamp is {head_dt}') raise DataUnavailable(f'First timestamp is {head_dt}')
bars, bars_array, first_dt, last_dt = out bars, bars_array, first_dt, last_dt = out
@ -277,8 +279,14 @@ async def get_bars(
# blank to start which tells ib to look up the latest datum # blank to start which tells ib to look up the latest datum
end_dt: str = '', end_dt: str = '',
# TODO: make this more dynamic based on measured frame rx latency.. # TODO: make this more dynamic based on measured frame rx latency?
timeout: float = 3, # how long before we trigger a feed reset # how long before we trigger a feed reset (seconds)
feed_reset_timeout: float = 3,
# how many days to subtract before giving up on further
# history queries for instrument, presuming that most don't
# not trade for a week XD
max_nodatas: int = 6,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -289,6 +297,7 @@ async def get_bars(
''' '''
global _data_resetter_task global _data_resetter_task
nodatas_count: int = 0
data_cs: trio.CancelScope | None = None data_cs: trio.CancelScope | None = None
result: tuple[ result: tuple[
@ -300,7 +309,7 @@ async def get_bars(
result_ready = trio.Event() result_ready = trio.Event()
async def query(): async def query():
nonlocal result, data_cs, end_dt nonlocal result, data_cs, end_dt, nodatas_count
while True: while True:
try: try:
out = await proxy.bars( out = await proxy.bars(
@ -362,24 +371,34 @@ async def get_bars(
) )
elif err.code == 162: elif err.code == 162:
if 'HMDS query returned no data' in err.message: if (
'HMDS query returned no data' in msg
):
# XXX: this is now done in the storage mgmt # XXX: this is now done in the storage mgmt
# layer and we shouldn't implicitly decrement # layer and we shouldn't implicitly decrement
# the frame dt index since the upper layer may # the frame dt index since the upper layer may
# be doing so concurrently and we don't want to # be doing so concurrently and we don't want to
# be delivering frames that weren't asked for. # be delivering frames that weren't asked for.
log.warning(
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back # try to decrement start point and look further back
# end_dt = end_dt.subtract(seconds=2000) # end_dt = end_dt.subtract(seconds=2000)
logmsg = "SUBTRACTING DAY from DT index"
if end_dt is not None: if end_dt is not None:
end_dt = end_dt.subtract(days=1) end_dt = end_dt.subtract(days=1)
print("SUBTRACTING DAY") elif end_dt is None:
else:
end_dt = pendulum.now().subtract(days=1) end_dt = pendulum.now().subtract(days=1)
log.warning(
f'NO DATA found ending @ {end_dt}\n'
+ logmsg
)
if nodatas_count >= max_nodatas:
raise DataUnavailable(
f'Presuming {fqsn} has no further history '
f'after {max_nodatas} tries..'
)
nodatas_count += 1
continue continue
elif 'API historical data query cancelled' in err.message: elif 'API historical data query cancelled' in err.message:
@ -434,7 +453,7 @@ async def get_bars(
# for a result before triggering a data feed reset. # for a result before triggering a data feed reset.
while not result_ready.is_set(): while not result_ready.is_set():
with trio.move_on_after(timeout): with trio.move_on_after(feed_reset_timeout):
await result_ready.wait() await result_ready.wait()
break break