Add basic tsdb history loading
If `marketstore` is detected try to only load most recent missing data from the data provider (broker) and the rest from the tsdb and push it all to shm for display in the UI. If the provider/broker doesn't have the history client endpoint, just use the old one for now so we can start to incrementally add support. Don't start the ohlc step incrementer task until the backend signals that the feed is live.marketstore_backup
parent
85e2602d2e
commit
75d0bf3152
|
@ -22,7 +22,6 @@ This module is enabled for ``brokerd`` daemons.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from datetime import datetime
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
@ -32,6 +31,8 @@ from typing import (
|
||||||
Awaitable,
|
Awaitable,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
import pendulum
|
||||||
import trio
|
import trio
|
||||||
from trio.abc import ReceiveChannel
|
from trio.abc import ReceiveChannel
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -49,7 +50,6 @@ from ._sharedmem import (
|
||||||
maybe_open_shm_array,
|
maybe_open_shm_array,
|
||||||
attach_shm_array,
|
attach_shm_array,
|
||||||
ShmArray,
|
ShmArray,
|
||||||
_secs_in_day,
|
|
||||||
)
|
)
|
||||||
from .ingest import get_ingestormod
|
from .ingest import get_ingestormod
|
||||||
from ._source import (
|
from ._source import (
|
||||||
|
@ -236,119 +236,137 @@ async def manage_history(
|
||||||
# we expect the sub-actor to write
|
# we expect the sub-actor to write
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
|
# TODO: history validation
|
||||||
|
if not opened:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Persistent shm for sym was already open?!"
|
||||||
|
)
|
||||||
|
|
||||||
log.info('Scanning for existing `marketstored`')
|
log.info('Scanning for existing `marketstored`')
|
||||||
|
|
||||||
is_up = await check_for_service('marketstored')
|
is_up = await check_for_service('marketstored')
|
||||||
|
|
||||||
# for now only do backfilling if no tsdb can be found
|
# for now only do backfilling if no tsdb can be found
|
||||||
do_backfill = not is_up and opened
|
do_legacy_backfill = not is_up and opened
|
||||||
|
|
||||||
|
open_history_client = getattr(mod, 'open_history_client', None)
|
||||||
|
|
||||||
|
if is_up and opened and open_history_client:
|
||||||
|
|
||||||
if is_up and opened:
|
|
||||||
log.info('Found existing `marketstored`')
|
log.info('Found existing `marketstored`')
|
||||||
from . import marketstore
|
from . import marketstore
|
||||||
|
|
||||||
async with marketstore.open_storage_client(
|
async with marketstore.open_storage_client(
|
||||||
fqsn,
|
fqsn,
|
||||||
) as (storage, tsdb_arrays):
|
) as storage:
|
||||||
|
|
||||||
# TODO: get this shit workin
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
from tractor.trionics import ipython_embed
|
|
||||||
await ipython_embed()
|
|
||||||
# await ipython_embed(ns=locals())
|
|
||||||
|
|
||||||
# TODO: history validation
|
if not tsdb_arrays:
|
||||||
# assert opened, f'Persistent shm for {symbol} was already open?!'
|
do_legacy_backfill = True
|
||||||
# if not opened:
|
|
||||||
# raise RuntimeError(
|
|
||||||
# "Persistent shm for sym was already open?!"
|
|
||||||
# )
|
|
||||||
|
|
||||||
if tsdb_arrays:
|
else:
|
||||||
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
||||||
fastest = list(tsdb_arrays[fqsn].values())[0]
|
|
||||||
last_s = fastest['Epoch'][-1]
|
|
||||||
|
|
||||||
# TODO: see if there's faster multi-field reads:
|
fastest = list(tsdb_arrays.values())[0]
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
times = fastest['Epoch']
|
||||||
|
first, last = times[0], times[-1]
|
||||||
# re-index with a `time` and index field
|
first_tsdb_dt, last_tsdb_dt = map(
|
||||||
shm.push(
|
pendulum.from_timestamp, [first, last]
|
||||||
fastest[-3 * _secs_in_day:],
|
|
||||||
|
|
||||||
# insert the history pre a "days worth" of samples
|
|
||||||
# to leave some real-time buffer space at the end.
|
|
||||||
prepend=True,
|
|
||||||
start=shm._len - _secs_in_day,
|
|
||||||
field_map={
|
|
||||||
'Epoch': 'time',
|
|
||||||
'Open': 'open',
|
|
||||||
'High': 'high',
|
|
||||||
'Low': 'low',
|
|
||||||
'Close': 'close',
|
|
||||||
'Volume': 'volume',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: this should be used verbatim for the pure
|
||||||
|
# shm backfiller approach below.
|
||||||
|
|
||||||
# start history anal and load missing new data via backend.
|
# start history anal and load missing new data via backend.
|
||||||
async with mod.open_history_client(fqsn) as hist:
|
async with open_history_client(fqsn) as hist:
|
||||||
|
|
||||||
# get latest query's worth of history
|
# get latest query's worth of history all the way
|
||||||
array, next_dt = await hist(end_dt='')
|
# back to what is recorded in the tsdb
|
||||||
|
array, start_dt, end_dt = await hist(end_dt='')
|
||||||
|
shm.push(array)
|
||||||
|
|
||||||
last_dt = datetime.fromtimestamp(last_s)
|
# let caller unblock and deliver latest history frame
|
||||||
array, next_dt = await hist(end_dt=last_dt)
|
task_status.started(shm)
|
||||||
else:
|
some_data_ready.set()
|
||||||
do_backfill = True
|
|
||||||
|
|
||||||
# await tractor.breakpoint()
|
# pull new history frames until we hit latest
|
||||||
|
# already in the tsdb
|
||||||
|
while start_dt > last_tsdb_dt:
|
||||||
|
|
||||||
some_data_ready.set()
|
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||||
|
s_diff = (last_tsdb_dt - start_dt).seconds
|
||||||
|
|
||||||
if do_backfill:
|
# if we detect a partial frame's worth of data
|
||||||
|
# that is new, slice out only that history and
|
||||||
|
# write to shm.
|
||||||
|
if s_diff > 0:
|
||||||
|
assert last_tsdb_dt > start_dt
|
||||||
|
selected = array['time'] > last_tsdb_dt.timestamp()
|
||||||
|
to_push = array[selected]
|
||||||
|
log.info(
|
||||||
|
f'Pushing partial frame {to_push.size} to shm'
|
||||||
|
)
|
||||||
|
shm.push(to_push, prepend=True)
|
||||||
|
break
|
||||||
|
|
||||||
|
else:
|
||||||
|
# write to shm
|
||||||
|
log.info(f'Pushing {array.size} datums to shm')
|
||||||
|
shm.push(array, prepend=True)
|
||||||
|
|
||||||
|
# TODO: see if there's faster multi-field reads:
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
|
# re-index with a `time` and index field
|
||||||
|
# await tractor.breakpoint()
|
||||||
|
|
||||||
|
shm.push(
|
||||||
|
fastest[-shm._first.value:],
|
||||||
|
|
||||||
|
# insert the history pre a "days worth" of samples
|
||||||
|
# to leave some real-time buffer space at the end.
|
||||||
|
prepend=True,
|
||||||
|
# start=shm._len - _secs_in_day,
|
||||||
|
field_map={
|
||||||
|
'Epoch': 'time',
|
||||||
|
'Open': 'open',
|
||||||
|
'High': 'high',
|
||||||
|
'Low': 'low',
|
||||||
|
'Close': 'close',
|
||||||
|
'Volume': 'volume',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: write new data to tsdb to be ready to for next
|
||||||
|
# read.
|
||||||
|
|
||||||
|
if do_legacy_backfill:
|
||||||
|
# do a legacy incremental backfill from the provider.
|
||||||
log.info('No existing `marketstored` found..')
|
log.info('No existing `marketstored` found..')
|
||||||
|
|
||||||
|
bfqsn = fqsn.replace('.' + mod.name, '')
|
||||||
# start history backfill task ``backfill_bars()`` is
|
# start history backfill task ``backfill_bars()`` is
|
||||||
# a required backend func this must block until shm is
|
# a required backend func this must block until shm is
|
||||||
# filled with first set of ohlc bars
|
# filled with first set of ohlc bars
|
||||||
await bus.nursery.start(
|
await bus.nursery.start(
|
||||||
start_backfill,
|
start_backfill,
|
||||||
mod,
|
mod,
|
||||||
fqsn,
|
bfqsn,
|
||||||
shm,
|
shm,
|
||||||
)
|
)
|
||||||
|
|
||||||
# _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
|
# yield back after client connect with filled shm
|
||||||
|
task_status.started(shm)
|
||||||
|
|
||||||
# yield back after client connect with filled shm
|
# indicate to caller that feed can be delivered to
|
||||||
task_status.started(shm)
|
# remote requesting client since we've loaded history
|
||||||
|
# data that can be used.
|
||||||
# indicate to caller that feed can be delivered to
|
some_data_ready.set()
|
||||||
# remote requesting client since we've loaded history
|
|
||||||
# data that can be used.
|
|
||||||
some_data_ready.set()
|
|
||||||
|
|
||||||
# detect sample step size for sampled historical data
|
|
||||||
times = shm.array['time']
|
|
||||||
delay_s = times[-1] - times[times != times[-1]][-1]
|
|
||||||
|
|
||||||
# begin real-time updates of shm and tsb once the feed
|
|
||||||
# goes live.
|
|
||||||
await feed_is_live.wait()
|
|
||||||
|
|
||||||
if opened:
|
|
||||||
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
|
|
||||||
|
|
||||||
# start shm incrementing for OHLC sampling at the current
|
|
||||||
# detected sampling period if one dne.
|
|
||||||
if sampler.incrementers.get(delay_s) is None:
|
|
||||||
await bus.start_task(
|
|
||||||
increment_ohlc_buffer,
|
|
||||||
delay_s,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# history retreival loop depending on user interaction and thus
|
||||||
|
# a small RPC-prot for remotely controllinlg what data is loaded
|
||||||
|
# for viewing.
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
# cs.cancel()
|
|
||||||
|
|
||||||
|
|
||||||
async def allocate_persistent_feed(
|
async def allocate_persistent_feed(
|
||||||
|
@ -416,7 +434,7 @@ async def allocate_persistent_feed(
|
||||||
manage_history,
|
manage_history,
|
||||||
mod,
|
mod,
|
||||||
bus,
|
bus,
|
||||||
bfqsn,
|
'.'.join((bfqsn, brokername)),
|
||||||
some_data_ready,
|
some_data_ready,
|
||||||
feed_is_live,
|
feed_is_live,
|
||||||
)
|
)
|
||||||
|
@ -429,7 +447,6 @@ async def allocate_persistent_feed(
|
||||||
|
|
||||||
# true fqsn
|
# true fqsn
|
||||||
fqsn = '.'.join((bfqsn, brokername))
|
fqsn = '.'.join((bfqsn, brokername))
|
||||||
|
|
||||||
# add a fqsn entry that includes the ``.<broker>`` suffix
|
# add a fqsn entry that includes the ``.<broker>`` suffix
|
||||||
init_msg[fqsn] = msg
|
init_msg[fqsn] = msg
|
||||||
|
|
||||||
|
@ -464,9 +481,22 @@ async def allocate_persistent_feed(
|
||||||
if not start_stream:
|
if not start_stream:
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
# backend will indicate when real-time quotes have begun.
|
# begin real-time updates of shm and tsb once the feed goes live and
|
||||||
|
# the backend will indicate when real-time quotes have begun.
|
||||||
await feed_is_live.wait()
|
await feed_is_live.wait()
|
||||||
|
|
||||||
|
# start shm incrementer task for OHLC style sampling
|
||||||
|
# at the current detected step period.
|
||||||
|
times = shm.array['time']
|
||||||
|
delay_s = times[-1] - times[times != times[-1]][-1]
|
||||||
|
|
||||||
|
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
|
||||||
|
if sampler.incrementers.get(delay_s) is None:
|
||||||
|
await bus.start_task(
|
||||||
|
increment_ohlc_buffer,
|
||||||
|
delay_s,
|
||||||
|
)
|
||||||
|
|
||||||
sum_tick_vlm: bool = init_msg.get(
|
sum_tick_vlm: bool = init_msg.get(
|
||||||
'shm_write_opts', {}
|
'shm_write_opts', {}
|
||||||
).get('sum_tick_vlm', True)
|
).get('sum_tick_vlm', True)
|
||||||
|
@ -545,7 +575,7 @@ async def open_feed_bus(
|
||||||
init_msg, first_quotes = bus.feeds[symbol]
|
init_msg, first_quotes = bus.feeds[symbol]
|
||||||
|
|
||||||
msg = init_msg[symbol]
|
msg = init_msg[symbol]
|
||||||
bfqsn = msg['fqsn']
|
bfqsn = msg['fqsn'].lower()
|
||||||
|
|
||||||
# true fqsn
|
# true fqsn
|
||||||
fqsn = '.'.join([bfqsn, brokername])
|
fqsn = '.'.join([bfqsn, brokername])
|
||||||
|
@ -864,7 +894,10 @@ async def maybe_open_feed(
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> (Feed, ReceiveChannel[dict[str, Any]]):
|
) -> (
|
||||||
|
Feed,
|
||||||
|
ReceiveChannel[dict[str, Any]],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Maybe open a data to a ``brokerd`` daemon only if there is no
|
Maybe open a data to a ``brokerd`` daemon only if there is no
|
||||||
local one for the broker-symbol pair, if one is cached use it wrapped
|
local one for the broker-symbol pair, if one is cached use it wrapped
|
||||||
|
@ -885,6 +918,7 @@ async def maybe_open_feed(
|
||||||
'start_stream': kwargs.get('start_stream', True),
|
'start_stream': kwargs.get('start_stream', True),
|
||||||
},
|
},
|
||||||
key=fqsn,
|
key=fqsn,
|
||||||
|
|
||||||
) as (cache_hit, feed):
|
) as (cache_hit, feed):
|
||||||
|
|
||||||
if cache_hit:
|
if cache_hit:
|
||||||
|
|
Loading…
Reference in New Issue