Support async-batched ohlc queries in all backends

Expect each backend to deliver a `config: dict[str, Any]` which provides
concurrency controls to `trimeter`'s batch task scheduler such that
backends can define their own concurrency limits.

The dirty deats in this patch include handling history "gaps" where
a query returns a history-frame-result which spans more then the typical
frame size (in seconds). In such cases we reset the target frame index
(datetime index sequence implemented with a `pendulum.Period`) using
a generator protocol `.send()` such that the sequence can be dynamically
re-indexed starting at the new (possibly) pre-gap datetime. The new gap
logic also allows us to detect out of order frames easier and thus wait
for the next-in-order to arrive before making more requests.
incr_update_backup
Tyler Goodlet 2022-05-03 13:19:49 -04:00
parent 303a5cc66c
commit 1967bc7973
4 changed files with 232 additions and 78 deletions

View File

@ -402,7 +402,7 @@ async def open_history_client(
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
yield get_ohlc, {'erlangs': 4, 'rate': 4}
async def backfill_bars(

View File

@ -57,6 +57,8 @@ from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from .. import config
from ..log import get_logger, get_console_log
@ -1442,8 +1444,6 @@ async def get_bars(
a ``MethoProxy``.
'''
import pendulum
fails = 0
bars: Optional[list] = None
first_dt: datetime = None
@ -1471,7 +1471,9 @@ async def get_bars(
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(f'bars retreived for dts {first_dt}:{last_dt}')
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
@ -1485,20 +1487,27 @@ async def get_bars(
raise NoData(
f'Symbol: {fqsn}',
)
break
elif (
err.code == 162
and 'HMDS query returned no data' in err.message
):
# try to decrement start point and look further back
end_dt = last_dt = last_dt.subtract(seconds=2000)
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
# index since the upper layer may be doing so
# concurrently and we don't want to be delivering frames
# that weren't asked for.
log.warning(
f'No data found ending @ {end_dt}\n'
f'Starting another request for {end_dt}'
f'NO DATA found ending @ {end_dt}\n'
)
continue
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
raise NoData(
f'Symbol: {fqsn}',
frame_size=2000,
)
elif _pacing in msg:
@ -1578,7 +1587,12 @@ async def open_history_client(
return bars_array, first_dt, last_dt
yield get_hist
# TODO: it seems like we can do async queries for ohlc
# but getting the order right still isn't working and I'm not
# quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side?
yield get_hist, {'erlangs': 1, 'rate': 6}
async def backfill_bars(
@ -1840,6 +1854,7 @@ async def stream_quotes(
symbol=sym,
)
first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}')
def mk_init_msgs() -> dict[str, dict]:
# pass back some symbol info like min_tick, trading_hours, etc.

View File

@ -1066,7 +1066,7 @@ async def open_history_client(
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(

View File

@ -29,6 +29,7 @@ from types import ModuleType
from typing import (
Any,
AsyncIterator, Optional,
Generator,
Awaitable,
)
@ -241,7 +242,7 @@ async def start_backfill(
) -> int:
async with mod.open_history_client(bfqsn) as hist:
async with mod.open_history_client(bfqsn) as (hist, config):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
@ -260,7 +261,9 @@ async def start_backfill(
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event()
# let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done))
@ -269,7 +272,7 @@ async def start_backfill(
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
frame_step_s = (end_dt - start_dt).seconds
frame_size_s = len(to_push) * step_size_s
if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!)
@ -277,7 +280,7 @@ async def start_backfill(
# based on the sample step size load a certain amount
# history
if step_size_s == 1:
last_tsdb_dt = pendulum.now().subtract(weeks=2)
last_tsdb_dt = pendulum.now().subtract(days=6)
elif step_size_s == 60:
last_tsdb_dt = pendulum.now().subtract(years=2)
@ -290,69 +293,159 @@ async def start_backfill(
'do dat bruh.'
)
hist_period = pendulum.period(
start_dt.subtract(seconds=step_size_s),
last_tsdb_dt,
)
end_dts = list(hist_period.range('seconds', frame_step_s))
# configure async query throttling
erlangs = config.get('erlangs', 1)
rate = config.get('rate', 1)
frames = {}
def iter_dts(start: datetime):
while True:
hist_period = pendulum.period(
start.subtract(seconds=step_size_s),
last_tsdb_dt,
)
dtrange = hist_period.range('seconds', frame_size_s)
for end_dt in dtrange:
log.warning(f'Yielding next frame start {end_dt}')
start = yield end_dt
# if caller sends a new start date, reset to that
if start is not None:
log.warning(f'Resetting date range: {start}')
# import pdbpp
# pdbpp.set_trace()
break
else:
# from while
return
# pull new history frames until we hit latest
# already in the tsdb or a max count.
count = 0
frames = {}
# NOTE: when gaps are detected in the retreived history (by
# comparisor of the end - start versus the expected "frame size"
# in seconds) we need a way to alert the async request code not
# to continue to query for data "within the gap". This var is
# set in such cases such that further requests in that period
# are discarded and further we reset the "datetimem query frame
# index" in such cases to avoid needless noop requests.
earliest_end_dt: Optional[datetime] = start_dt
async def get_ohlc_frame(
input_end_dt: datetime,
iter_dts_gen: Generator[datetime],
) -> np.ndarray:
nonlocal count
nonlocal count, frames, earliest_end_dt, frame_size_s
count += 1
if input_end_dt > earliest_end_dt:
# if a request comes in for an inter-gap frame we
# discard it since likely this request is still
# lingering from before the reset of ``iter_dts()`` via
# ``.send()`` below.
log.info(f'Discarding request history ending @ {input_end_dt}')
# signals to ``trimeter`` loop to discard and
# ``continue`` in it's schedule loop.
return None
try:
log.info(
f'Requesting {step_size_s}s frame ending in {input_end_dt}'
)
array, start_dt, end_dt = await hist(end_dt=input_end_dt)
# if input_end_dt.timestamp() == end_dts[0].timestamp():
# await tractor.breakpoint()
assert array['time'][0] == start_dt.timestamp()
except NoData:
# decrement by the diff in time last delivered.
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
log.warning('no data for range {(end_dt - start_dt)} ?!?')
# continue
log.warning(
f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?'
)
return None # discard signal
except DataUnavailable:
except DataUnavailable as duerr:
# broker is being a bish and we can't pull
# any more..
log.warning('backend halted on data deliver !?!?')
return input_end_dt, None
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
return duerr
diff = end_dt - start_dt
frame_time_diff_s = diff.seconds
expected_frame_size_s = frame_size_s + step_size_s
if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so indicate to the request loop that this gap is
# expected by both,
# - resetting the ``iter_dts()`` generator to start at
# the new start point delivered in this result
# - setting the non-locally scoped ``earliest_end_dt``
# to this new value so that the request loop doesn't
# get tripped up thinking there's an out of order
# request-result condition.
log.warning(
f'History frame ending @ {end_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds'
)
# reset dtrange gen to new start point
next_end = iter_dts_gen.send(start_dt)
log.info(
f'Reset frame index to start at {start_dt}\n'
f'Was at {next_end}'
)
# TODO: can we avoid this?
earliest_end_dt = start_dt
to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
# XXX: hacky, just run indefinitely
# last_tsdb_dt=None,
)
print(f"PULLING {count}")
log.info(f'Pushing {to_push.size} to shm!')
ln = len(to_push)
if ln:
log.info(f'{ln} bars for {start_dt} -> {end_dt}')
frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt)
return to_push, start_dt, end_dt
frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt)
else:
log.warning(
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
)
return None
return to_push, start_dt, end_dt
# initial dt index starts at the start of the first query result
idts = iter_dts(start_dt)
# if to_push.size < 1:
# print('UHHH SIZE <1 BREAKING!?')
# break
rate = erlangs = 5
async with trimeter.amap(
get_ohlc_frame,
end_dts,
partial(
get_ohlc_frame,
# we close in the ``iter_dt()`` gen in so we can send
# reset signals as needed for gap dection in the
# history.
iter_dts_gen=idts,
),
idts,
capture_outcome=True,
include_value=True,
# better technical names bruv...
max_at_once=erlangs,
max_per_second=rate,
@ -362,59 +455,101 @@ async def start_backfill(
# (i.e., not necessarily in the original order)
async for input_end_dt, outcome in outcomes:
# no data available case..
if outcome is None:
break
try:
out = outcome.unwrap()
if out is None:
# skip signal
continue
elif isinstance(out, DataUnavailable):
# no data available case signal.. so just kill
# further requests and basically just stop
# trying...
break
except Exception:
log.exception('uhh trimeter bail')
raise
else:
to_push, start_dt, end_dt = out
if not len(to_push):
# diff returned no new data (i.e. we probablyl hit
# the ``last_tsdb_dt`` point).
# TODO: raise instead?
log.warning(f'No history for range {start_dt} -> {end_dt}')
continue
# pipeline-style pull frames until we need to wait for
# the next in order to arrive.
i = end_dts.index(input_end_dt)
print(f'latest end_dt {end_dt} found at index {i}')
# i = end_dts.index(input_end_dt)
# print(f'latest end_dt {end_dt} found at index {i}')
for epoch in reversed(sorted(frames)):
epochs = list(reversed(sorted(frames)))
for epoch in epochs:
start = shm.array['time'][0]
# we don't yet have the next frame to push
# so break back to the async request loop.
diff = epoch - start
if abs(diff) > step_size_s:
if len(frames) > 20:
if earliest_end_dt < end_dt:
# XXX: an expected gap was encountered (see
# logic in ``get_ohlc_frame()``, so allow
# this frame through to the storage layer.
log.warning(
f'there appears to be a history gap of {diff}?'
f'there is an expected history gap of {diff}s:'
)
elif (
erlangs > 1
and len(epochs) < erlangs
):
# we don't yet have the next frame to push
# so break back to the async request loop
# while we wait for more async frame-results
# to arrive.
expect_end = pendulum.from_timestamp(start)
expect_start = expect_end.subtract(
seconds=frame_size_s)
log.warning(
'waiting on out-of-order history frame:\n'
f'{expect_end - expect_start}'
)
# from pprint import pprint
# await tractor.breakpoint()
else:
break
to_push, start_dt, end_dt = frames.pop(epoch)
print(f'pushing frame ending at {end_dt}')
if not len(to_push):
break
# bail on shm allocation overrun
# bail gracefully on shm allocation overrun/full condition
try:
shm.push(to_push, prepend=True)
except ValueError:
await tractor.breakpoint()
log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
)
# await tractor.breakpoint()
break
for delay_s in sampler.subscribers:
await broadcast(delay_s)
log.info(
f'Shm pushed {len(to_push)} frame:\n'
f'{start_dt} -> {end_dt}'
)
# keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async
# frame-result order.
earliest_end_dt = start_dt
# TODO: can we only trigger this if the respective
# history in "in view"?!?
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
for delay_s in sampler.subscribers:
await broadcast(delay_s)
bf_done.set()
# update start index to include all tsdb history
# that was pushed in the caller parent task.
# shm._first.value = 0
async def manage_history(
@ -490,6 +625,17 @@ async def manage_history(
last_tsdb_dt=last_dt,
)
)
# if len(shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
task_status.started(shm)
some_data_ready.set()
@ -524,14 +670,7 @@ async def manage_history(
prepend=True,
# update_first=False,
# start=prepend_start,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
field_map=marketstore.ohlc_key_map,
)
# load as much from storage into shm as spacec will