Rework history frame request concurrency

Manual tinker-testing demonstrated that triggering data resets
completely independent of the frame request gets more throughput and
further, that repeated requests (for the same frame after cancelling on
the `trio`-side) can yield duplicate frame responses. Re-work the
dual-task structure to instead have one task wait indefinitely on the
frame response (and thus not trigger duplicate frames) and the 2nd data
reset task poll for the first task to complete in a poll loop which
terminates when the frame arrives via an event.

Dirty deatz:
- make `get_bars()` take an optional timeout (which will eventually be
  dynamically passed from the history mgmt machinery) and move request
  logic inside a new `query()` closure meant to be spawned in a task
  which sets an event on frame arrival, add data reset poll loop in the
  main/parent task, deliver result on nursery completion.
- handle frame request cancelled event case without crash.
- on no-frame result (due to real history gap) hack in a 1 day decrement
  case which we need to eventually allow the caller to control likely
  based on measured frame rx latency.
- make `wait_on_data_reset()` a predicate without output indicating
  reset success as well as `trio.Nursery.start()` compat so that it can
  be started in a new task with the started values yielded being
  a cancel scope and completion event.
- drop the legacy `backfill_bars()`, not longer used.
ib_1m_hist
Tyler Goodlet 2022-09-28 13:18:15 -04:00
parent 25b90afbdb
commit 7396624be0
1 changed files with 136 additions and 178 deletions

View File

@ -22,6 +22,7 @@ import asyncio
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from datetime import datetime
from functools import partial
from math import isnan
import time
from typing import (
@ -38,7 +39,6 @@ import tractor
import trio
from trio_typing import TaskStatus
from piker.data._sharedmem import ShmArray
from .._util import SymbolNotFound, NoData
from .api import (
# _adhoc_futes_set,
@ -111,6 +111,15 @@ async def open_history_client(
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
'''
# TODO:
# - add logic to handle tradable hours and only grab
# valid bars in the range?
# - we want to avoid overrunning the underlying shm array buffer and
# we should probably calc the number of calls to make depending on
# that until we have the `marketstore` daemon in place in which case
# the shm size will be driven by user config and available sys
# memory.
async with open_data_client() as proxy:
async def get_hist(
@ -120,21 +129,19 @@ async def open_history_client(
) -> tuple[np.ndarray, str]:
out, fails = await get_bars(
out = await get_bars(
proxy,
symbol,
timeframe,
end_dt=end_dt,
)
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
if out is None:
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(
f'{end_dt}',
frame_size=2000,
# frame_size=2000,
)
bars, bars_array, first_dt, last_dt = out
@ -162,11 +169,16 @@ _pacing: str = (
async def wait_on_data_reset(
proxy: MethodProxy,
tries: int = 2,
reset_type: str = 'data',
timeout: float = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
):
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
# TODO: we might have to put a task lock around this
# method..
@ -186,59 +198,43 @@ async def wait_on_data_reset(
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
# try 3 time with a data reset then fail over to
# a connection reset.
for i in range(1, tries):
done = trio.Event()
with trio.move_on_after(timeout) as cs:
task_status.started((cs, done))
log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data')
res = await data_reset_hack(reset_type=reset_type)
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
task_status.started(cs)
await ev.wait()
log.info(f"{name} DATA RESET")
break
if (
cs.cancelled_caught
and not cs.cancel_called
):
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else:
log.warning('Sending CONNECTION RESET')
res = await data_reset_hack(reset_type='connection')
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
done.set()
return False
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
# TODO: not sure if waiting on other events
# is all that useful here or not.
# - in theory you could wait on one of the ones above first
# to verify the reset request was sent?
# - we need the same for real-time quote feeds which can
# sometimes flake out and stop delivering..
for name, ev in [
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
done.set()
return True
if cs.cancelled_caught:
log.warning('Data CONNECTION RESET timeout!?')
if cs.cancel_called:
log.warning(
'Data reset task canceled?'
)
done.set()
return False
async def get_bars(
@ -249,6 +245,7 @@ async def get_bars(
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
timeout: float = 1.5, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -258,52 +255,44 @@ async def get_bars(
a ``MethoProxy``.
'''
fails = 0
bars: Optional[list] = None
first_dt: datetime = None
last_dt: datetime = None
data_cs: Optional[trio.CancelScope] = None
result: Optional[tuple[
ibis.objects.BarDataList,
np.ndarray,
datetime,
datetime,
]] = None
result_ready = trio.Event()
if end_dt:
last_dt = pendulum.from_timestamp(end_dt.timestamp())
timeout: float = float('inf')
async with trio.open_nursery() as nurse:
for _ in range(10):
async def query():
nonlocal result, data_cs, end_dt
while True:
try:
out = None
with trio.move_on_after(timeout) as cs:
out = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
sample_period_s=timeframe,
)
timeout = 3
out = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
sample_period_s=timeframe,
if (
cs.cancelled_caught
and out is None
):
print(f"RESETTING DATA after {timeout}")
await nurse.start(
wait_on_data_reset,
proxy,
timeout=float('inf'),
tries=100,
)
# scale timeout up exponentially to avoid
# request-overruning the history farm.
# timeout *= 2
continue
if out:
bars, bars_array = out
else:
# ideally we cancel the request just before we
# cancel on the ``trio``-side and trigger a data
# reset hack.. the problem is there's no way (with
# current impl) to detect a cancel case.
# timeout=timeout,
)
if out is None:
raise NoData(
f'{end_dt}',
# frame_size=2000,
)
bars, bars_array = out
if not bars:
# TODO: duration lookup for this
end_dt = end_dt.subtract(days=1)
print("SUBTRACTING DAY")
continue
if bars_array is None:
raise SymbolNotFound(fqsn)
@ -317,10 +306,18 @@ async def get_bars(
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
f'{len(bars)} bars retreived {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
if data_cs:
data_cs.cancel()
result = (bars, bars_array, first_dt, last_dt)
# signal data reset loop parent task
result_ready.set()
return result
except RequestError as err:
msg = err.message
@ -345,14 +342,20 @@ async def get_bars(
)
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
# end_dt = end_dt.subtract(seconds=2000)
end_dt = end_dt.subtract(days=1)
print("SUBTRACTING DAY")
continue
raise NoData(
f'Symbol: {fqsn}',
# TODO: fix this since we don't strictly use 1s
# ohlc any more XD
frame_size=2000,
elif (
err.code == 162 and
'API historical data query cancelled' in err.message
):
log.warning(
'Query cancelled by IB (:eyeroll:):\n'
f'{err.message}'
)
continue
# elif (
# err.code == 162 and
@ -362,103 +365,58 @@ async def get_bars(
# log.warning("ignoring ip address warning")
# continue
# XXX: more or less same as above timeout case
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
await wait_on_data_reset(proxy)
# cancel any existing reset task
if data_cs:
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
timeout=float('inf'),
reset_type='connection'
)
)
continue
else:
raise
return None, None
# else: # throttle wasn't fixed so error out immediately
# raise _err
async with trio.open_nursery() as nurse:
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
async def backfill_bars(
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
while not result_ready.is_set():
fqsn: str,
shm: ShmArray, # type: ignore # noqa
timeframe: float = 1, # in seconds
with trio.move_on_after(timeout):
await result_ready.wait()
continue
# TODO: we want to avoid overrunning the underlying shm array buffer
# and we should probably calc the number of calls to make depending
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
'''
# last_dt1 = None
last_dt = None
with trio.CancelScope() as cs:
async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn, timeframe)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, first_dt, last_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
last_dt = first_dt
# write historical data to buffer
shm.push(bars_array)
task_status.started(cs)
i = 0
while i < count:
out, fails = await get_bars(
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
fqsn,
timeframe,
end_dt=first_dt,
timeout=float('inf'),
# timeout=timeout,
)
)
# sync wait on reset to complete
await reset_done.wait()
if out is None:
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
# only grab valid bars in the range
log.error(f"Can't grab bars starting at {first_dt}!?!?")
# XXX: get_bars() should internally decrement dt by
# 2k seconds and try again.
continue
(first_bars, bars_array, first_dt, last_dt) = out
# last_dt1 = last_dt
# last_dt = first_dt
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True)
i += 1
return result
asset_type_map = {