Drop `trimeter`-ized concurrent history querying

It doesn't seem to be any slower on our least throttled backend
(binance) and it removes a bunch of hard to get correct frame
re-ordering logic that i'm not sure really ever fully worked XD

Commented some issues we still need to resolve as well.
ib_1m_hist
Tyler Goodlet 2022-09-28 15:22:05 -04:00
parent 7396624be0
commit 61ca5f7e19
1 changed files with 69 additions and 271 deletions

View File

@ -21,18 +21,19 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import (
dataclass,
field,
)
from datetime import datetime
from functools import partial from functools import partial
from pprint import pformat
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Callable, Callable,
Optional, Optional,
Generator,
Awaitable, Awaitable,
TYPE_CHECKING, TYPE_CHECKING,
Union, Union,
@ -41,7 +42,6 @@ from typing import (
import trio import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import trimeter
import tractor import tractor
from tractor.trionics import maybe_open_context from tractor.trionics import maybe_open_context
import pendulum import pendulum
@ -300,6 +300,9 @@ async def start_backfill(
log.info(f'Pushing {to_push.size} to shm!') log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push) shm.push(to_push)
# TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn..
# otherwise all fsps get reset on every chart..
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)
@ -337,79 +340,31 @@ async def start_backfill(
last_tsdb_dt = start_dt.subtract(**kwargs) last_tsdb_dt = start_dt.subtract(**kwargs)
# configure async query throttling # configure async query throttling
erlangs = config.get('erlangs', 1) # rate = config.get('rate', 1)
rate = config.get('rate', 1) # XXX: legacy from ``trimeter`` code but unsupported now.
frames = {} # erlangs = config.get('erlangs', 1)
def iter_dts(start: datetime): # inline sequential loop where we simply pass the
# last retrieved start dt to the next request as
# it's end dt.
starts: set[datetime] = set()
while True: while start_dt > last_tsdb_dt:
hist_period = pendulum.period(
start,
last_tsdb_dt,
)
dtrange = list(hist_period.range('seconds', frame_size_s))
log.debug(f'New datetime index:\n{pformat(dtrange)}')
for end_dt in dtrange:
log.info(f'Yielding next frame start {end_dt}')
start = yield end_dt
# if caller sends a new start date, reset to that
if start is not None:
log.warning(f'Resetting date range: {start}')
break
else:
# from while
return
# pull new history frames until we hit latest
# already in the tsdb or a max count.
count = 0
# NOTE: when gaps are detected in the retreived history (by
# comparisor of the end - start versus the expected "frame size"
# in seconds) we need a way to alert the async request code not
# to continue to query for data "within the gap". This var is
# set in such cases such that further requests in that period
# are discarded and further we reset the "datetimem query frame
# index" in such cases to avoid needless noop requests.
earliest_end_dt: Optional[datetime] = start_dt
async def get_ohlc_frame(
input_end_dt: datetime,
iter_dts_gen: Generator[datetime],
) -> np.ndarray:
nonlocal count, frames, earliest_end_dt, frame_size_s
count += 1
if input_end_dt > earliest_end_dt:
# if a request comes in for an inter-gap frame we
# discard it since likely this request is still
# lingering from before the reset of ``iter_dts()`` via
# ``.send()`` below.
log.info(f'Discarding request history ending @ {input_end_dt}')
# signals to ``trimeter`` loop to discard and
# ``continue`` in it's schedule loop.
return None
print(f"QUERY end_dt={start_dt}")
try: try:
log.info( log.info(
f'Requesting {step_size_s}s frame ending in {input_end_dt}' f'Requesting {step_size_s}s frame ending in {start_dt}'
) )
array, start_dt, end_dt = await hist( array, start_dt, end_dt = await hist(
timeframe, timeframe,
end_dt=input_end_dt, end_dt=start_dt,
) )
assert array['time'][0] == start_dt.timestamp() assert array['time'][0] == start_dt.timestamp()
except NoData: except NoData:
log.warning( log.warning(
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?' f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?'
) )
return None # discard signal return None # discard signal
@ -433,41 +388,27 @@ async def start_backfill(
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so indicate to the request loop that this gap is # so just report it to console for now.
# expected by both,
# - resetting the ``iter_dts()`` generator to start at
# the new start point delivered in this result
# - setting the non-locally scoped ``earliest_end_dt``
# to this new value so that the request loop doesn't
# get tripped up thinking there's an out of order
# request-result condition.
log.warning( log.warning(
f'History frame ending @ {end_dt} appears to have a gap:\n' f'History frame ending @ {end_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds' f'{diff} ~= {frame_time_diff_s} seconds'
) )
# reset dtrange gen to new start point array, _start_dt, end_dt = await hist(
try: timeframe,
next_end = iter_dts_gen.send(start_dt) end_dt=start_dt,
log.info( )
f'Reset frame index to start at {start_dt}\n'
f'Was at {next_end}'
)
# NOTE: manually set "earliest end datetime" index-value if (
# to avoid the request loop getting confused about _start_dt in starts
# new frames that are earlier in history - i.e. this ):
# **is not** the case of out-of-order frames from print("SKIPPING DUPLICATE FRAME @ {_start_dt}")
# an async batch request. start_dt = min(starts)
earliest_end_dt = start_dt continue
except StopIteration: # only update new start point if new
# gen already terminated meaning we probably already start_dt = _start_dt
# exhausted it via frame requests. starts.add(start_dt)
log.info(
"Datetime index already exhausted, can't reset.."
)
to_push = diff_history( to_push = diff_history(
array, array,
@ -478,195 +419,53 @@ async def start_backfill(
ln = len(to_push) ln = len(to_push)
if ln: if ln:
log.info(f'{ln} bars for {start_dt} -> {end_dt}') log.info(f'{ln} bars for {start_dt} -> {end_dt}')
frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt)
return to_push, start_dt, end_dt
else: else:
log.warning( log.warning(
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
) )
return None
# initial dt index starts at the start of the first query result # bail gracefully on shm allocation overrun/full condition
idts = iter_dts(start_dt) try:
shm.push(to_push, prepend=True)
except ValueError:
log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
)
break
async with trimeter.amap( log.info(
partial( f'Shm pushed {ln} frame:\n'
get_ohlc_frame, f'{start_dt} -> {end_dt}'
# we close in the ``iter_dt()`` gen in so we can send )
# reset signals as needed for gap dection in the
# history.
iter_dts_gen=idts,
),
idts,
capture_outcome=True, if (
include_value=True, storage is not None
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
timeframe,
)
# better technical names bruv... # TODO: can we only trigger this if the respective
max_at_once=erlangs, # history in "in view"?!?
max_per_second=rate,
) as outcomes: # XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# Then iterate over the return values, as they become available # values while we're pipelining the current ones to
# (i.e., not necessarily in the original order) # memory...
async for input_end_dt, outcome in outcomes: for delay_s in sampler.subscribers:
await broadcast(delay_s)
try:
out = outcome.unwrap()
if out is None:
# skip signal
continue
elif isinstance(out, DataUnavailable):
# no data available case signal.. so just kill
# further requests and basically just stop
# trying...
break
except Exception:
log.exception('uhh trimeter bail')
raise
else:
to_push, start_dt, end_dt = out
if not len(to_push):
# diff returned no new data (i.e. we probablyl hit
# the ``last_tsdb_dt`` point).
# TODO: raise instead?
log.warning(f'No history for range {start_dt} -> {end_dt}')
continue
# pipeline-style pull frames until we need to wait for
# the next in order to arrive.
# i = end_dts.index(input_end_dt)
# print(f'latest end_dt {end_dt} found at index {i}')
epochs = list(reversed(sorted(frames)))
for epoch in epochs:
start = shm.array['time'][0]
last_shm_prepend_dt = pendulum.from_timestamp(start)
earliest_frame_queue_dt = pendulum.from_timestamp(epoch)
diff = start - epoch
if diff < 0:
log.warning(
'Discarding out of order frame:\n'
f'{earliest_frame_queue_dt}'
)
frames.pop(epoch)
continue
if diff > step_size_s:
if earliest_end_dt < earliest_frame_queue_dt:
# XXX: an expected gap was encountered (see
# logic in ``get_ohlc_frame()``, so allow
# this frame through to the storage layer.
log.warning(
f'Expected history gap of {diff}s:\n'
f'{earliest_frame_queue_dt} <- '
f'{earliest_end_dt}'
)
elif (
erlangs > 1
):
# we don't yet have the next frame to push
# so break back to the async request loop
# while we wait for more async frame-results
# to arrive.
if len(frames) >= erlangs:
log.warning(
'Frame count in async-queue is greater '
'then erlangs?\n'
'There seems to be a gap between:\n'
f'{earliest_frame_queue_dt} <- '
f'{last_shm_prepend_dt}\n'
'Conducting manual call for frame ending: '
f'{last_shm_prepend_dt}'
)
(
to_push,
start_dt,
end_dt,
) = await get_ohlc_frame(
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
)
last_epoch = to_push['time'][-1]
diff = start - last_epoch
if diff > step_size_s:
await tractor.breakpoint()
raise DataUnavailable(
'An awkward frame was found:\n'
f'{start_dt} -> {end_dt}:\n{to_push}'
)
else:
frames[last_epoch] = (
to_push, start_dt, end_dt)
break
expect_end = pendulum.from_timestamp(start)
expect_start = expect_end.subtract(
seconds=frame_size_s)
log.warning(
'waiting on out-of-order history frame:\n'
f'{expect_end - expect_start}'
)
break
to_push, start_dt, end_dt = frames.pop(epoch)
ln = len(to_push)
# bail gracefully on shm allocation overrun/full condition
try:
shm.push(to_push, prepend=True)
except ValueError:
log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
)
break
log.info(
f'Shm pushed {ln} frame:\n'
f'{start_dt} -> {end_dt}'
)
# keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async
# frame-result order.
earliest_end_dt = start_dt
if (
storage is not None
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
timeframe,
)
# TODO: can we only trigger this if the respective
# history in "in view"?!?
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# short-circuit (for now)
bf_done.set() bf_done.set()
return
async def manage_history( async def manage_history(
@ -789,7 +588,6 @@ async def manage_history(
else: else:
dt_diff_s = 0 dt_diff_s = 0
# await trio.sleep_forever()
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field