Make history routines `timeframe` aware
Allow data feed sub-system to specify the timeframe (aka OHLC sample period) to the `open_history_client()` delivered history fetching API. Factor the data keycombo hack into a new routine to be used also from the history backfiller code when request latency increases; there is a first draft at trying to use the feed reset to speed up 1m frame throttling by timing out on the history frame response, but it needs a lot of fine tuning.clears_table_events
parent
1adf5fb9c0
commit
87f7a03dbe
|
@ -114,12 +114,18 @@ async def open_history_client(
|
||||||
async with open_data_client() as proxy:
|
async with open_data_client() as proxy:
|
||||||
|
|
||||||
async def get_hist(
|
async def get_hist(
|
||||||
|
timeframe: float,
|
||||||
end_dt: Optional[datetime] = None,
|
end_dt: Optional[datetime] = None,
|
||||||
start_dt: Optional[datetime] = None,
|
start_dt: Optional[datetime] = None,
|
||||||
|
|
||||||
) -> tuple[np.ndarray, str]:
|
) -> tuple[np.ndarray, str]:
|
||||||
|
|
||||||
out, fails = await get_bars(proxy, symbol, end_dt=end_dt)
|
out, fails = await get_bars(
|
||||||
|
proxy,
|
||||||
|
symbol,
|
||||||
|
timeframe,
|
||||||
|
end_dt=end_dt,
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: add logic here to handle tradable hours and only grab
|
# TODO: add logic here to handle tradable hours and only grab
|
||||||
# valid bars in the range
|
# valid bars in the range
|
||||||
|
@ -145,7 +151,7 @@ async def open_history_client(
|
||||||
# quite sure why.. needs some tinkering and probably
|
# quite sure why.. needs some tinkering and probably
|
||||||
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
|
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
|
||||||
# we have to do the batch queries on the `asyncio` side?
|
# we have to do the batch queries on the `asyncio` side?
|
||||||
yield get_hist, {'erlangs': 1, 'rate': 6}
|
yield get_hist, {'erlangs': 1, 'rate': 3}
|
||||||
|
|
||||||
|
|
||||||
_pacing: str = (
|
_pacing: str = (
|
||||||
|
@ -154,14 +160,99 @@ _pacing: str = (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def wait_on_data_reset(
|
||||||
|
proxy: MethodProxy,
|
||||||
|
tries: int = 2,
|
||||||
|
timeout: float = 16,
|
||||||
|
|
||||||
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
|
||||||
|
# TODO: we might have to put a task lock around this
|
||||||
|
# method..
|
||||||
|
hist_ev = proxy.status_event(
|
||||||
|
'HMDS data farm connection is OK:ushmds'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: other event messages we might want to try and
|
||||||
|
# wait for but i wasn't able to get any of this
|
||||||
|
# reliable..
|
||||||
|
# reconnect_start = proxy.status_event(
|
||||||
|
# 'Market data farm is connecting:usfuture'
|
||||||
|
# )
|
||||||
|
# live_ev = proxy.status_event(
|
||||||
|
# 'Market data farm connection is OK:usfuture'
|
||||||
|
# )
|
||||||
|
# try to wait on the reset event(s) to arrive, a timeout
|
||||||
|
# will trigger a retry up to 6 times (for now).
|
||||||
|
|
||||||
|
# try 3 time with a data reset then fail over to
|
||||||
|
# a connection reset.
|
||||||
|
for i in range(1, tries):
|
||||||
|
|
||||||
|
log.warning('Sending DATA RESET request')
|
||||||
|
await data_reset_hack(reset_type='data')
|
||||||
|
task_status.started()
|
||||||
|
|
||||||
|
with trio.move_on_after(timeout) as cs:
|
||||||
|
for name, ev in [
|
||||||
|
# TODO: not sure if waiting on other events
|
||||||
|
# is all that useful here or not. in theory
|
||||||
|
# you could wait on one of the ones above
|
||||||
|
# first to verify the reset request was
|
||||||
|
# sent?
|
||||||
|
('history', hist_ev),
|
||||||
|
]:
|
||||||
|
await ev.wait()
|
||||||
|
log.info(f"{name} DATA RESET")
|
||||||
|
break
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
# fails += 1
|
||||||
|
log.warning(
|
||||||
|
f'Data reset {name} timeout, retrying {i}.'
|
||||||
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
|
||||||
|
log.warning('Sending CONNECTION RESET')
|
||||||
|
res = await data_reset_hack(reset_type='connection')
|
||||||
|
if not res:
|
||||||
|
log.warning(
|
||||||
|
'NO VNC DETECTED!\n'
|
||||||
|
'Manually press ctrl-alt-f on your IB java app'
|
||||||
|
)
|
||||||
|
# break
|
||||||
|
|
||||||
|
with trio.move_on_after(timeout) as cs:
|
||||||
|
for name, ev in [
|
||||||
|
# TODO: not sure if waiting on other events
|
||||||
|
# is all that useful here or not. in theory
|
||||||
|
# you could wait on one of the ones above
|
||||||
|
# first to verify the reset request was
|
||||||
|
# sent?
|
||||||
|
('history', hist_ev),
|
||||||
|
]:
|
||||||
|
await ev.wait()
|
||||||
|
log.info(f"{name} DATA RESET")
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
# fails += 1
|
||||||
|
log.warning('Data CONNECTION RESET timeout!?')
|
||||||
|
|
||||||
|
|
||||||
async def get_bars(
|
async def get_bars(
|
||||||
|
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
|
period: float,
|
||||||
|
|
||||||
# blank to start which tells ib to look up the latest datum
|
# blank to start which tells ib to look up the latest datum
|
||||||
end_dt: str = '',
|
end_dt: str = '',
|
||||||
|
|
||||||
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> (dict, np.ndarray):
|
) -> (dict, np.ndarray):
|
||||||
'''
|
'''
|
||||||
Retrieve historical data from a ``trio``-side task using
|
Retrieve historical data from a ``trio``-side task using
|
||||||
|
@ -176,157 +267,111 @@ async def get_bars(
|
||||||
if end_dt:
|
if end_dt:
|
||||||
last_dt = pendulum.from_timestamp(end_dt.timestamp())
|
last_dt = pendulum.from_timestamp(end_dt.timestamp())
|
||||||
|
|
||||||
for _ in range(10):
|
timeout: float = float('inf')
|
||||||
try:
|
async with trio.open_nursery() as nurse:
|
||||||
out = await proxy.bars(
|
for _ in range(10):
|
||||||
fqsn=fqsn,
|
try:
|
||||||
end_dt=end_dt,
|
out = None
|
||||||
)
|
with trio.move_on_after(timeout) as cs:
|
||||||
if out:
|
out = await proxy.bars(
|
||||||
bars, bars_array = out
|
fqsn=fqsn,
|
||||||
|
end_dt=end_dt,
|
||||||
|
sample_period_s=period,
|
||||||
|
)
|
||||||
|
timeout = 3
|
||||||
|
|
||||||
else:
|
if (
|
||||||
await tractor.breakpoint()
|
cs.cancelled_caught
|
||||||
|
and out is None
|
||||||
|
):
|
||||||
|
print(f"RESETTING DATA after {timeout}")
|
||||||
|
await nurse.start(
|
||||||
|
wait_on_data_reset,
|
||||||
|
proxy,
|
||||||
|
timeout=float('inf'),
|
||||||
|
tries=100,
|
||||||
|
)
|
||||||
|
# scale timeout up exponentially to avoid
|
||||||
|
# request-overruning the history farm.
|
||||||
|
# timeout *= 2
|
||||||
|
continue
|
||||||
|
|
||||||
if bars_array is None:
|
if out:
|
||||||
raise SymbolNotFound(fqsn)
|
bars, bars_array = out
|
||||||
|
|
||||||
first_dt = pendulum.from_timestamp(
|
|
||||||
bars[0].date.timestamp())
|
|
||||||
|
|
||||||
last_dt = pendulum.from_timestamp(
|
|
||||||
bars[-1].date.timestamp())
|
|
||||||
|
|
||||||
time = bars_array['time']
|
|
||||||
assert time[-1] == last_dt.timestamp()
|
|
||||||
assert time[0] == first_dt.timestamp()
|
|
||||||
log.info(
|
|
||||||
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
|
|
||||||
)
|
|
||||||
|
|
||||||
return (bars, bars_array, first_dt, last_dt), fails
|
|
||||||
|
|
||||||
except RequestError as err:
|
|
||||||
msg = err.message
|
|
||||||
|
|
||||||
if 'No market data permissions for' in msg:
|
|
||||||
# TODO: signalling for no permissions searches
|
|
||||||
raise NoData(
|
|
||||||
f'Symbol: {fqsn}',
|
|
||||||
)
|
|
||||||
|
|
||||||
elif (
|
|
||||||
err.code == 162 and
|
|
||||||
'HMDS query returned no data' in err.message
|
|
||||||
):
|
|
||||||
# XXX: this is now done in the storage mgmt layer
|
|
||||||
# and we shouldn't implicitly decrement the frame dt
|
|
||||||
# index since the upper layer may be doing so
|
|
||||||
# concurrently and we don't want to be delivering frames
|
|
||||||
# that weren't asked for.
|
|
||||||
log.warning(
|
|
||||||
f'NO DATA found ending @ {end_dt}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# try to decrement start point and look further back
|
|
||||||
# end_dt = last_dt = last_dt.subtract(seconds=2000)
|
|
||||||
|
|
||||||
raise NoData(
|
|
||||||
f'Symbol: {fqsn}',
|
|
||||||
frame_size=2000,
|
|
||||||
)
|
|
||||||
|
|
||||||
# elif (
|
|
||||||
# err.code == 162 and
|
|
||||||
# 'Trading TWS session is connected from a different IP
|
|
||||||
# address' in err.message
|
|
||||||
# ):
|
|
||||||
# log.warning("ignoring ip address warning")
|
|
||||||
# continue
|
|
||||||
|
|
||||||
elif _pacing in msg:
|
|
||||||
|
|
||||||
log.warning(
|
|
||||||
'History throttle rate reached!\n'
|
|
||||||
'Resetting farms with `ctrl-alt-f` hack\n'
|
|
||||||
)
|
|
||||||
# TODO: we might have to put a task lock around this
|
|
||||||
# method..
|
|
||||||
hist_ev = proxy.status_event(
|
|
||||||
'HMDS data farm connection is OK:ushmds'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: other event messages we might want to try and
|
|
||||||
# wait for but i wasn't able to get any of this
|
|
||||||
# reliable..
|
|
||||||
# reconnect_start = proxy.status_event(
|
|
||||||
# 'Market data farm is connecting:usfuture'
|
|
||||||
# )
|
|
||||||
# live_ev = proxy.status_event(
|
|
||||||
# 'Market data farm connection is OK:usfuture'
|
|
||||||
# )
|
|
||||||
|
|
||||||
# try to wait on the reset event(s) to arrive, a timeout
|
|
||||||
# will trigger a retry up to 6 times (for now).
|
|
||||||
tries: int = 2
|
|
||||||
timeout: float = 10
|
|
||||||
|
|
||||||
# try 3 time with a data reset then fail over to
|
|
||||||
# a connection reset.
|
|
||||||
for i in range(1, tries):
|
|
||||||
|
|
||||||
log.warning('Sending DATA RESET request')
|
|
||||||
await data_reset_hack(reset_type='data')
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
|
||||||
for name, ev in [
|
|
||||||
# TODO: not sure if waiting on other events
|
|
||||||
# is all that useful here or not. in theory
|
|
||||||
# you could wait on one of the ones above
|
|
||||||
# first to verify the reset request was
|
|
||||||
# sent?
|
|
||||||
('history', hist_ev),
|
|
||||||
]:
|
|
||||||
await ev.wait()
|
|
||||||
log.info(f"{name} DATA RESET")
|
|
||||||
break
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
|
||||||
fails += 1
|
|
||||||
log.warning(
|
|
||||||
f'Data reset {name} timeout, retrying {i}.'
|
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
|
||||||
else:
|
else:
|
||||||
|
raise NoData(
|
||||||
|
f'{end_dt}',
|
||||||
|
# frame_size=2000,
|
||||||
|
)
|
||||||
|
|
||||||
log.warning('Sending CONNECTION RESET')
|
if bars_array is None:
|
||||||
res = await data_reset_hack(reset_type='connection')
|
raise SymbolNotFound(fqsn)
|
||||||
if not res:
|
|
||||||
log.warning(
|
|
||||||
'NO VNC DETECTED!\n'
|
|
||||||
'Manually press ctrl-alt-f on your IB java app'
|
|
||||||
)
|
|
||||||
# break
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
first_dt = pendulum.from_timestamp(
|
||||||
for name, ev in [
|
bars[0].date.timestamp())
|
||||||
# TODO: not sure if waiting on other events
|
|
||||||
# is all that useful here or not. in theory
|
|
||||||
# you could wait on one of the ones above
|
|
||||||
# first to verify the reset request was
|
|
||||||
# sent?
|
|
||||||
('history', hist_ev),
|
|
||||||
]:
|
|
||||||
await ev.wait()
|
|
||||||
log.info(f"{name} DATA RESET")
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
last_dt = pendulum.from_timestamp(
|
||||||
fails += 1
|
bars[-1].date.timestamp())
|
||||||
log.warning('Data CONNECTION RESET timeout!?')
|
|
||||||
|
|
||||||
else:
|
time = bars_array['time']
|
||||||
raise
|
assert time[-1] == last_dt.timestamp()
|
||||||
|
assert time[0] == first_dt.timestamp()
|
||||||
|
log.info(
|
||||||
|
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
|
||||||
|
)
|
||||||
|
|
||||||
|
return (bars, bars_array, first_dt, last_dt), fails
|
||||||
|
|
||||||
|
except RequestError as err:
|
||||||
|
msg = err.message
|
||||||
|
|
||||||
|
if 'No market data permissions for' in msg:
|
||||||
|
# TODO: signalling for no permissions searches
|
||||||
|
raise NoData(
|
||||||
|
f'Symbol: {fqsn}',
|
||||||
|
)
|
||||||
|
|
||||||
|
elif (
|
||||||
|
err.code == 162 and
|
||||||
|
'HMDS query returned no data' in err.message
|
||||||
|
):
|
||||||
|
# XXX: this is now done in the storage mgmt layer
|
||||||
|
# and we shouldn't implicitly decrement the frame dt
|
||||||
|
# index since the upper layer may be doing so
|
||||||
|
# concurrently and we don't want to be delivering frames
|
||||||
|
# that weren't asked for.
|
||||||
|
log.warning(
|
||||||
|
f'NO DATA found ending @ {end_dt}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# try to decrement start point and look further back
|
||||||
|
# end_dt = last_dt = last_dt.subtract(seconds=2000)
|
||||||
|
|
||||||
|
raise NoData(
|
||||||
|
f'Symbol: {fqsn}',
|
||||||
|
# TODO: fix this since we don't strictly use 1s
|
||||||
|
# ohlc any more XD
|
||||||
|
frame_size=2000,
|
||||||
|
)
|
||||||
|
|
||||||
|
# elif (
|
||||||
|
# err.code == 162 and
|
||||||
|
# 'Trading TWS session is connected from a different IP
|
||||||
|
# address' in err.message
|
||||||
|
# ):
|
||||||
|
# log.warning("ignoring ip address warning")
|
||||||
|
# continue
|
||||||
|
|
||||||
|
elif _pacing in msg:
|
||||||
|
log.warning(
|
||||||
|
'History throttle rate reached!\n'
|
||||||
|
'Resetting farms with `ctrl-alt-f` hack\n'
|
||||||
|
)
|
||||||
|
await wait_on_data_reset(proxy)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
return None, None
|
return None, None
|
||||||
# else: # throttle wasn't fixed so error out immediately
|
# else: # throttle wasn't fixed so error out immediately
|
||||||
|
@ -337,6 +382,7 @@ async def backfill_bars(
|
||||||
|
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
shm: ShmArray, # type: ignore # noqa
|
shm: ShmArray, # type: ignore # noqa
|
||||||
|
timeframe: float = 1, # in seconds
|
||||||
|
|
||||||
# TODO: we want to avoid overrunning the underlying shm array buffer
|
# TODO: we want to avoid overrunning the underlying shm array buffer
|
||||||
# and we should probably calc the number of calls to make depending
|
# and we should probably calc the number of calls to make depending
|
||||||
|
@ -362,7 +408,7 @@ async def backfill_bars(
|
||||||
|
|
||||||
async with open_data_client() as proxy:
|
async with open_data_client() as proxy:
|
||||||
|
|
||||||
out, fails = await get_bars(proxy, fqsn)
|
out, fails = await get_bars(proxy, fqsn, timeframe)
|
||||||
|
|
||||||
if out is None:
|
if out is None:
|
||||||
raise RuntimeError("Could not pull currrent history?!")
|
raise RuntimeError("Could not pull currrent history?!")
|
||||||
|
@ -380,7 +426,12 @@ async def backfill_bars(
|
||||||
i = 0
|
i = 0
|
||||||
while i < count:
|
while i < count:
|
||||||
|
|
||||||
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
|
out, fails = await get_bars(
|
||||||
|
proxy,
|
||||||
|
fqsn,
|
||||||
|
timeframe,
|
||||||
|
end_dt=first_dt,
|
||||||
|
)
|
||||||
|
|
||||||
if out is None:
|
if out is None:
|
||||||
# could be trying to retreive bars over weekend
|
# could be trying to retreive bars over weekend
|
||||||
|
|
Loading…
Reference in New Issue