Make history routines `timeframe` aware

Allow data feed sub-system to specify the timeframe (aka OHLC sample
period) to the `open_history_client()` delivered history fetching API.
Factor the data keycombo hack into a new routine to be used also from
the history backfiller code when request latency increases; there is
a first draft at trying to use the feed reset to speed up 1m frame
throttling by timing out on the history frame response, but it needs
a lot of fine tuning.
ib_1m_hist
Tyler Goodlet 2022-09-15 11:57:07 -04:00
parent 220981e718
commit 2a866dde65
1 changed files with 198 additions and 147 deletions

View File

@ -114,12 +114,18 @@ async def open_history_client(
async with open_data_client() as proxy: async with open_data_client() as proxy:
async def get_hist( async def get_hist(
timeframe: float,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
) -> tuple[np.ndarray, str]: ) -> tuple[np.ndarray, str]:
out, fails = await get_bars(proxy, symbol, end_dt=end_dt) out, fails = await get_bars(
proxy,
symbol,
timeframe,
end_dt=end_dt,
)
# TODO: add logic here to handle tradable hours and only grab # TODO: add logic here to handle tradable hours and only grab
# valid bars in the range # valid bars in the range
@ -145,7 +151,7 @@ async def open_history_client(
# quite sure why.. needs some tinkering and probably # quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe # a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side? # we have to do the batch queries on the `asyncio` side?
yield get_hist, {'erlangs': 1, 'rate': 6} yield get_hist, {'erlangs': 1, 'rate': 3}
_pacing: str = ( _pacing: str = (
@ -154,14 +160,99 @@ _pacing: str = (
) )
async def wait_on_data_reset(
proxy: MethodProxy,
tries: int = 2,
timeout: float = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
):
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
# XXX: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
# try 3 time with a data reset then fail over to
# a connection reset.
for i in range(1, tries):
log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data')
task_status.started()
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
break
if cs.cancelled_caught:
# fails += 1
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else:
log.warning('Sending CONNECTION RESET')
res = await data_reset_hack(reset_type='connection')
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
# break
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
if cs.cancelled_caught:
# fails += 1
log.warning('Data CONNECTION RESET timeout!?')
async def get_bars( async def get_bars(
proxy: MethodProxy, proxy: MethodProxy,
fqsn: str, fqsn: str,
period: float,
# blank to start which tells ib to look up the latest datum # blank to start which tells ib to look up the latest datum
end_dt: str = '', end_dt: str = '',
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> (dict, np.ndarray): ) -> (dict, np.ndarray):
''' '''
Retrieve historical data from a ``trio``-side task using Retrieve historical data from a ``trio``-side task using
@ -176,157 +267,111 @@ async def get_bars(
if end_dt: if end_dt:
last_dt = pendulum.from_timestamp(end_dt.timestamp()) last_dt = pendulum.from_timestamp(end_dt.timestamp())
for _ in range(10): timeout: float = float('inf')
try: async with trio.open_nursery() as nurse:
out = await proxy.bars( for _ in range(10):
fqsn=fqsn, try:
end_dt=end_dt, out = None
) with trio.move_on_after(timeout) as cs:
if out: out = await proxy.bars(
bars, bars_array = out fqsn=fqsn,
end_dt=end_dt,
sample_period_s=period,
)
timeout = 3
else: if (
await tractor.breakpoint() cs.cancelled_caught
and out is None
):
print(f"RESETTING DATA after {timeout}")
await nurse.start(
wait_on_data_reset,
proxy,
timeout=float('inf'),
tries=100,
)
# scale timeout up exponentially to avoid
# request-overruning the history farm.
# timeout *= 2
continue
if bars_array is None: if out:
raise SymbolNotFound(fqsn) bars, bars_array = out
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
last_dt = pendulum.from_timestamp(
bars[-1].date.timestamp())
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
except RequestError as err:
msg = err.message
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
elif (
err.code == 162 and
'HMDS query returned no data' in err.message
):
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
# index since the upper layer may be doing so
# concurrently and we don't want to be delivering frames
# that weren't asked for.
log.warning(
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
raise NoData(
f'Symbol: {fqsn}',
frame_size=2000,
)
# elif (
# err.code == 162 and
# 'Trading TWS session is connected from a different IP
# address' in err.message
# ):
# log.warning("ignoring ip address warning")
# continue
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
# XXX: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
tries: int = 2
timeout: float = 10
# try 3 time with a data reset then fail over to
# a connection reset.
for i in range(1, tries):
log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data')
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
break
if cs.cancelled_caught:
fails += 1
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else: else:
raise NoData(
f'{end_dt}',
# frame_size=2000,
)
log.warning('Sending CONNECTION RESET') if bars_array is None:
res = await data_reset_hack(reset_type='connection') raise SymbolNotFound(fqsn)
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
# break
with trio.move_on_after(timeout) as cs: first_dt = pendulum.from_timestamp(
for name, ev in [ bars[0].date.timestamp())
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
if cs.cancelled_caught: last_dt = pendulum.from_timestamp(
fails += 1 bars[-1].date.timestamp())
log.warning('Data CONNECTION RESET timeout!?')
else: time = bars_array['time']
raise assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
except RequestError as err:
msg = err.message
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
elif (
err.code == 162 and
'HMDS query returned no data' in err.message
):
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
# index since the upper layer may be doing so
# concurrently and we don't want to be delivering frames
# that weren't asked for.
log.warning(
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
raise NoData(
f'Symbol: {fqsn}',
# TODO: fix this since we don't strictly use 1s
# ohlc any more XD
frame_size=2000,
)
# elif (
# err.code == 162 and
# 'Trading TWS session is connected from a different IP
# address' in err.message
# ):
# log.warning("ignoring ip address warning")
# continue
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
await wait_on_data_reset(proxy)
else:
raise
return None, None return None, None
# else: # throttle wasn't fixed so error out immediately # else: # throttle wasn't fixed so error out immediately
@ -337,6 +382,7 @@ async def backfill_bars(
fqsn: str, fqsn: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
timeframe: float = 1, # in seconds
# TODO: we want to avoid overrunning the underlying shm array buffer # TODO: we want to avoid overrunning the underlying shm array buffer
# and we should probably calc the number of calls to make depending # and we should probably calc the number of calls to make depending
@ -362,7 +408,7 @@ async def backfill_bars(
async with open_data_client() as proxy: async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn) out, fails = await get_bars(proxy, fqsn, timeframe)
if out is None: if out is None:
raise RuntimeError("Could not pull currrent history?!") raise RuntimeError("Could not pull currrent history?!")
@ -380,7 +426,12 @@ async def backfill_bars(
i = 0 i = 0
while i < count: while i < count:
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) out, fails = await get_bars(
proxy,
fqsn,
timeframe,
end_dt=first_dt,
)
if out is None: if out is None:
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend