Rework history frame request concurrency

Manual tinker-testing demonstrated that triggering data resets
completely independent of the frame request gets more throughput and
further, that repeated requests (for the same frame after cancelling on
the `trio`-side) can yield duplicate frame responses. Re-work the
dual-task structure to instead have one task wait indefinitely on the
frame response (and thus not trigger duplicate frames) and the 2nd data
reset task poll for the first task to complete in a poll loop which
terminates when the frame arrives via an event.

Dirty deatz:
- make `get_bars()` take an optional timeout (which will eventually be
  dynamically passed from the history mgmt machinery) and move request
  logic inside a new `query()` closure meant to be spawned in a task
  which sets an event on frame arrival, add data reset poll loop in the
  main/parent task, deliver result on nursery completion.
- handle frame request cancelled event case without crash.
- on no-frame result (due to real history gap) hack in a 1 day decrement
  case which we need to eventually allow the caller to control likely
  based on measured frame rx latency.
- make `wait_on_data_reset()` a predicate without output indicating
  reset success as well as `trio.Nursery.start()` compat so that it can
  be started in a new task with the started values yielded being
  a cancel scope and completion event.
- drop the legacy `backfill_bars()`, not longer used.
clears_table_events
Tyler Goodlet 2022-09-28 13:18:15 -04:00
parent 844f8beaa7
commit c939e75ef9
1 changed files with 136 additions and 178 deletions

View File

@ -22,6 +22,7 @@ import asyncio
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial
from math import isnan from math import isnan
import time import time
from typing import ( from typing import (
@ -38,7 +39,6 @@ import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from piker.data._sharedmem import ShmArray
from .._util import SymbolNotFound, NoData from .._util import SymbolNotFound, NoData
from .api import ( from .api import (
# _adhoc_futes_set, # _adhoc_futes_set,
@ -111,6 +111,15 @@ async def open_history_client(
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
''' '''
# TODO:
# - add logic to handle tradable hours and only grab
# valid bars in the range?
# - we want to avoid overrunning the underlying shm array buffer and
# we should probably calc the number of calls to make depending on
# that until we have the `marketstore` daemon in place in which case
# the shm size will be driven by user config and available sys
# memory.
async with open_data_client() as proxy: async with open_data_client() as proxy:
async def get_hist( async def get_hist(
@ -120,21 +129,19 @@ async def open_history_client(
) -> tuple[np.ndarray, str]: ) -> tuple[np.ndarray, str]:
out, fails = await get_bars( out = await get_bars(
proxy, proxy,
symbol, symbol,
timeframe, timeframe,
end_dt=end_dt, end_dt=end_dt,
) )
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
if out is None: if out is None:
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?") log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData( raise NoData(
f'{end_dt}', f'{end_dt}',
frame_size=2000, # frame_size=2000,
) )
bars, bars_array, first_dt, last_dt = out bars, bars_array, first_dt, last_dt = out
@ -162,11 +169,16 @@ _pacing: str = (
async def wait_on_data_reset( async def wait_on_data_reset(
proxy: MethodProxy, proxy: MethodProxy,
tries: int = 2, reset_type: str = 'data',
timeout: float = 16, timeout: float = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[
): tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
# TODO: we might have to put a task lock around this # TODO: we might have to put a task lock around this
# method.. # method..
@ -186,59 +198,43 @@ async def wait_on_data_reset(
# try to wait on the reset event(s) to arrive, a timeout # try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now). # will trigger a retry up to 6 times (for now).
# try 3 time with a data reset then fail over to done = trio.Event()
# a connection reset. with trio.move_on_after(timeout) as cs:
for i in range(1, tries):
task_status.started((cs, done))
log.warning('Sending DATA RESET request') log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data') res = await data_reset_hack(reset_type=reset_type)
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
task_status.started(cs)
await ev.wait()
log.info(f"{name} DATA RESET")
break
if (
cs.cancelled_caught
and not cs.cancel_called
):
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else:
log.warning('Sending CONNECTION RESET')
res = await data_reset_hack(reset_type='connection')
if not res: if not res:
log.warning( log.warning(
'NO VNC DETECTED!\n' 'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app' 'Manually press ctrl-alt-f on your IB java app'
) )
done.set()
return False
with trio.move_on_after(timeout) as cs: # TODO: not sure if waiting on other events
for name, ev in [ # is all that useful here or not.
# TODO: not sure if waiting on other events # - in theory you could wait on one of the ones above first
# is all that useful here or not. in theory # to verify the reset request was sent?
# you could wait on one of the ones above # - we need the same for real-time quote feeds which can
# first to verify the reset request was # sometimes flake out and stop delivering..
# sent? for name, ev in [
('history', hist_ev), ('history', hist_ev),
]: ]:
await ev.wait() await ev.wait()
log.info(f"{name} DATA RESET") log.info(f"{name} DATA RESET")
done.set()
return True
if cs.cancelled_caught: if cs.cancel_called:
log.warning('Data CONNECTION RESET timeout!?') log.warning(
'Data reset task canceled?'
)
done.set()
return False
async def get_bars( async def get_bars(
@ -249,6 +245,7 @@ async def get_bars(
# blank to start which tells ib to look up the latest datum # blank to start which tells ib to look up the latest datum
end_dt: str = '', end_dt: str = '',
timeout: float = 1.5, # how long before we trigger a feed reset
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -258,52 +255,44 @@ async def get_bars(
a ``MethoProxy``. a ``MethoProxy``.
''' '''
fails = 0 data_cs: Optional[trio.CancelScope] = None
bars: Optional[list] = None result: Optional[tuple[
first_dt: datetime = None ibis.objects.BarDataList,
last_dt: datetime = None np.ndarray,
datetime,
datetime,
]] = None
result_ready = trio.Event()
if end_dt: async def query():
last_dt = pendulum.from_timestamp(end_dt.timestamp()) nonlocal result, data_cs, end_dt
while True:
timeout: float = float('inf')
async with trio.open_nursery() as nurse:
for _ in range(10):
try: try:
out = None out = await proxy.bars(
with trio.move_on_after(timeout) as cs: fqsn=fqsn,
out = await proxy.bars( end_dt=end_dt,
fqsn=fqsn, sample_period_s=timeframe,
end_dt=end_dt,
sample_period_s=timeframe,
)
timeout = 3
if ( # ideally we cancel the request just before we
cs.cancelled_caught # cancel on the ``trio``-side and trigger a data
and out is None # reset hack.. the problem is there's no way (with
): # current impl) to detect a cancel case.
print(f"RESETTING DATA after {timeout}") # timeout=timeout,
await nurse.start( )
wait_on_data_reset, if out is None:
proxy,
timeout=float('inf'),
tries=100,
)
# scale timeout up exponentially to avoid
# request-overruning the history farm.
# timeout *= 2
continue
if out:
bars, bars_array = out
else:
raise NoData( raise NoData(
f'{end_dt}', f'{end_dt}',
# frame_size=2000, # frame_size=2000,
) )
bars, bars_array = out
if not bars:
# TODO: duration lookup for this
end_dt = end_dt.subtract(days=1)
print("SUBTRACTING DAY")
continue
if bars_array is None: if bars_array is None:
raise SymbolNotFound(fqsn) raise SymbolNotFound(fqsn)
@ -317,10 +306,18 @@ async def get_bars(
assert time[-1] == last_dt.timestamp() assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp() assert time[0] == first_dt.timestamp()
log.info( log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' f'{len(bars)} bars retreived {first_dt} -> {last_dt}'
) )
return (bars, bars_array, first_dt, last_dt), fails if data_cs:
data_cs.cancel()
result = (bars, bars_array, first_dt, last_dt)
# signal data reset loop parent task
result_ready.set()
return result
except RequestError as err: except RequestError as err:
msg = err.message msg = err.message
@ -345,14 +342,20 @@ async def get_bars(
) )
# try to decrement start point and look further back # try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000) # end_dt = end_dt.subtract(seconds=2000)
end_dt = end_dt.subtract(days=1)
print("SUBTRACTING DAY")
continue
raise NoData( elif (
f'Symbol: {fqsn}', err.code == 162 and
# TODO: fix this since we don't strictly use 1s 'API historical data query cancelled' in err.message
# ohlc any more XD ):
frame_size=2000, log.warning(
'Query cancelled by IB (:eyeroll:):\n'
f'{err.message}'
) )
continue
# elif ( # elif (
# err.code == 162 and # err.code == 162 and
@ -362,103 +365,58 @@ async def get_bars(
# log.warning("ignoring ip address warning") # log.warning("ignoring ip address warning")
# continue # continue
# XXX: more or less same as above timeout case
elif _pacing in msg: elif _pacing in msg:
log.warning( log.warning(
'History throttle rate reached!\n' 'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n' 'Resetting farms with `ctrl-alt-f` hack\n'
) )
await wait_on_data_reset(proxy)
# cancel any existing reset task
if data_cs:
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
timeout=float('inf'),
reset_type='connection'
)
)
continue
else: else:
raise raise
return None, None async with trio.open_nursery() as nurse:
# else: # throttle wasn't fixed so error out immediately
# raise _err
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
async def backfill_bars( # start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
while not result_ready.is_set():
fqsn: str, with trio.move_on_after(timeout):
shm: ShmArray, # type: ignore # noqa await result_ready.wait()
timeframe: float = 1, # in seconds continue
# TODO: we want to avoid overrunning the underlying shm array buffer # spawn new data reset task
# and we should probably calc the number of calls to make depending data_cs, reset_done = await nurse.start(
# on that until we have the `marketstore` daemon in place in which partial(
# case the shm size will be driven by user config and available sys wait_on_data_reset,
# memory.
count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
'''
# last_dt1 = None
last_dt = None
with trio.CancelScope() as cs:
async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn, timeframe)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, first_dt, last_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
last_dt = first_dt
# write historical data to buffer
shm.push(bars_array)
task_status.started(cs)
i = 0
while i < count:
out, fails = await get_bars(
proxy, proxy,
fqsn, timeout=float('inf'),
timeframe, # timeout=timeout,
end_dt=first_dt,
) )
)
# sync wait on reset to complete
await reset_done.wait()
if out is None: return result
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
# only grab valid bars in the range
log.error(f"Can't grab bars starting at {first_dt}!?!?")
# XXX: get_bars() should internally decrement dt by
# 2k seconds and try again.
continue
(first_bars, bars_array, first_dt, last_dt) = out
# last_dt1 = last_dt
# last_dt = first_dt
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True)
i += 1
asset_type_map = { asset_type_map = {