Add diffing logic to `tsdb_history_update()`
Add some basic `numpy` epoch slice logic to generate append and prepend arrays to write to the db. Mooar cool things, - add a `Storage.delete_ts()` method to wipe a column series from the db easily. - don't attempt to read in any OHLC series by default on client load - add some `pyqtgraph` profiling and drop manual latency measures - if no db series for the fqsn exists write the entire shm arrayincr_update_backup
parent
1837e467be
commit
a682887e63
|
@ -29,14 +29,13 @@ from typing import (
|
||||||
Any,
|
Any,
|
||||||
Optional,
|
Optional,
|
||||||
Union,
|
Union,
|
||||||
# Callable,
|
|
||||||
# TYPE_CHECKING,
|
|
||||||
)
|
)
|
||||||
import time
|
import time
|
||||||
from math import isnan
|
from math import isnan
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
import msgpack
|
import msgpack
|
||||||
|
import pyqtgraph as pg
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -49,15 +48,8 @@ from anyio_marketstore import (
|
||||||
import purerpc
|
import purerpc
|
||||||
|
|
||||||
from .feed import maybe_open_feed
|
from .feed import maybe_open_feed
|
||||||
from ._source import (
|
|
||||||
mk_fqsn,
|
|
||||||
# Symbol,
|
|
||||||
)
|
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
|
|
||||||
# if TYPE_CHECKING:
|
|
||||||
# from ._sharedmem import ShmArray
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -235,6 +227,16 @@ class Storage:
|
||||||
# series' cache from tsdb reads
|
# series' cache from tsdb reads
|
||||||
self._arrays: dict[str, np.ndarray] = {}
|
self._arrays: dict[str, np.ndarray] = {}
|
||||||
|
|
||||||
|
async def list_keys(self) -> list[str]:
|
||||||
|
return await self.client.list_symbols()
|
||||||
|
|
||||||
|
async def search_keys(self, pattern: str) -> list[str]:
|
||||||
|
'''
|
||||||
|
Search for time series key in the storage backend.
|
||||||
|
|
||||||
|
'''
|
||||||
|
...
|
||||||
|
|
||||||
async def write_ticks(self, ticks: list) -> None:
|
async def write_ticks(self, ticks: list) -> None:
|
||||||
...
|
...
|
||||||
|
|
||||||
|
@ -262,7 +264,9 @@ class Storage:
|
||||||
for tfstr in tf_in_1s.values():
|
for tfstr in tf_in_1s.values():
|
||||||
try:
|
try:
|
||||||
log.info(f'querying for {tfstr}@{fqsn}')
|
log.info(f'querying for {tfstr}@{fqsn}')
|
||||||
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
|
result = await client.query(
|
||||||
|
Params(fqsn, tfstr, 'OHLCV',)
|
||||||
|
)
|
||||||
break
|
break
|
||||||
except purerpc.grpclib.exceptions.UnknownError:
|
except purerpc.grpclib.exceptions.UnknownError:
|
||||||
# XXX: this is already logged by the container and
|
# XXX: this is already logged by the container and
|
||||||
|
@ -276,6 +280,9 @@ class Storage:
|
||||||
tfstr = tf_in_1s[timeframe]
|
tfstr = tf_in_1s[timeframe]
|
||||||
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
|
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
|
||||||
|
|
||||||
|
# TODO: it turns out column access on recarrays is actually slower:
|
||||||
|
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
||||||
|
# it might make sense to make these structured arrays?
|
||||||
# Fill out a `numpy` array-results map
|
# Fill out a `numpy` array-results map
|
||||||
arrays = {}
|
arrays = {}
|
||||||
for fqsn, data_set in result.by_symbols().items():
|
for fqsn, data_set in result.by_symbols().items():
|
||||||
|
@ -283,7 +290,22 @@ class Storage:
|
||||||
tf_in_1s.inverse[data_set.timeframe]
|
tf_in_1s.inverse[data_set.timeframe]
|
||||||
] = data_set.array
|
] = data_set.array
|
||||||
|
|
||||||
return arrays[fqsn][timeframe] if timeframe else arrays
|
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
|
||||||
|
|
||||||
|
async def delete_ts(
|
||||||
|
self,
|
||||||
|
key: str,
|
||||||
|
timeframe: Optional[Union[int, str]] = None,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
client = self.client
|
||||||
|
syms = await client.list_symbols()
|
||||||
|
print(syms)
|
||||||
|
# if key not in syms:
|
||||||
|
# raise KeyError(f'`{fqsn}` table key not found?')
|
||||||
|
|
||||||
|
return await client.destroy(tbk=key)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -296,19 +318,16 @@ async def open_storage_client(
|
||||||
Load a series by key and deliver in ``numpy`` struct array format.
|
Load a series by key and deliver in ``numpy`` struct array format.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with get_client() as client:
|
async with (
|
||||||
|
# eventually a storage backend endpoint
|
||||||
storage_client = Storage(client)
|
get_client() as client,
|
||||||
arrays = await storage_client.read_ohlcv(
|
):
|
||||||
fqsn,
|
# slap on our wrapper api
|
||||||
period,
|
yield Storage(client)
|
||||||
)
|
|
||||||
|
|
||||||
yield storage_client, arrays
|
|
||||||
|
|
||||||
|
|
||||||
async def backfill_history_diff(
|
async def tsdb_history_update(
|
||||||
# symbol: Symbol
|
fqsn: str,
|
||||||
|
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
|
|
||||||
|
@ -338,108 +357,92 @@ async def backfill_history_diff(
|
||||||
# * the original data feed arch blurb:
|
# * the original data feed arch blurb:
|
||||||
# - https://github.com/pikers/piker/issues/98
|
# - https://github.com/pikers/piker/issues/98
|
||||||
#
|
#
|
||||||
|
profiler = pg.debug.Profiler(
|
||||||
broker = 'ib'
|
disabled=False, # not pg_profile_enabled(),
|
||||||
symbol = 'mnq.globex'
|
delayed=False,
|
||||||
|
)
|
||||||
# broker = 'binance'
|
|
||||||
# symbol = 'btcusdt'
|
|
||||||
|
|
||||||
fqsn = mk_fqsn(broker, symbol)
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
get_client() as client,
|
open_storage_client(fqsn) as storage,
|
||||||
|
|
||||||
maybe_open_feed(
|
maybe_open_feed(
|
||||||
broker,
|
[fqsn],
|
||||||
[symbol],
|
|
||||||
loglevel='info',
|
|
||||||
# backpressure=False,
|
|
||||||
start_stream=False,
|
start_stream=False,
|
||||||
|
|
||||||
) as (feed, stream),
|
) as (feed, stream),
|
||||||
):
|
):
|
||||||
syms = await client.list_symbols()
|
profiler(f'opened feed for {fqsn}')
|
||||||
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
|
||||||
|
symbol = feed.symbols.get(fqsn)
|
||||||
|
if symbol:
|
||||||
|
fqsn = symbol.front_fqsn()
|
||||||
|
|
||||||
|
syms = await storage.client.list_symbols()
|
||||||
|
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||||
|
profiler(f'listed symbols {syms}')
|
||||||
|
|
||||||
# diff db history with shm and only write the missing portions
|
# diff db history with shm and only write the missing portions
|
||||||
ohlcv = feed.shm.array
|
ohlcv = feed.shm.array
|
||||||
|
|
||||||
key = (fqsn, '1Sec', 'OHLCV')
|
# TODO: use pg profiler
|
||||||
tbk = mk_tbk(key)
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
|
|
||||||
# diff vs. existing array and append new history
|
to_append = feed.shm.array
|
||||||
# TODO:
|
to_prepend = None
|
||||||
|
|
||||||
# TODO: should be no error?
|
# hist diffing
|
||||||
# assert not resp.responses
|
if tsdb_arrays:
|
||||||
|
onesec = tsdb_arrays[1]
|
||||||
|
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
|
||||||
|
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
|
||||||
|
|
||||||
start = time.time()
|
profiler('Finished db arrays diffs')
|
||||||
|
|
||||||
qr = await client.query(
|
for array in [to_append, to_prepend]:
|
||||||
# Params(fqsn, '1Sec`', 'OHLCV',)
|
if array is None:
|
||||||
Params(*key),
|
continue
|
||||||
)
|
|
||||||
# # Dig out `numpy` results map
|
|
||||||
arrays: dict[tuple[str, int], np.ndarray] = {}
|
|
||||||
for name, data_set in qr.by_symbols().items():
|
|
||||||
in_secs = tf_in_1s.inverse[data_set.timeframe]
|
|
||||||
arrays[(name, in_secs)] = data_set.array
|
|
||||||
|
|
||||||
s1 = arrays[(fqsn, 1)]
|
log.info(
|
||||||
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
|
f'Writing datums {array.size} -> to tsdb from shm\n'
|
||||||
|
)
|
||||||
|
|
||||||
end_diff = time.time()
|
# build mkts schema compat array for writing
|
||||||
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
|
mkts_dt = np.dtype(_ohlcv_dt)
|
||||||
|
mkts_array = np.zeros(
|
||||||
|
len(array),
|
||||||
|
dtype=mkts_dt,
|
||||||
|
)
|
||||||
|
# copy from shm array (yes it's this easy):
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
|
||||||
|
mkts_array[:] = array[[
|
||||||
|
'time',
|
||||||
|
'open',
|
||||||
|
'high',
|
||||||
|
'low',
|
||||||
|
'close',
|
||||||
|
'volume',
|
||||||
|
]]
|
||||||
|
|
||||||
log.info(
|
# write to db
|
||||||
f'Appending {to_append.size} datums to tsdb from shm\n'
|
resp = await storage.client.write(
|
||||||
f'Total diff time: {diff_ms} ms'
|
mkts_array,
|
||||||
)
|
tbk=f'{fqsn}/1Sec/OHLCV',
|
||||||
|
|
||||||
# build mkts schema compat array for writing
|
# NOTE: will will append duplicates
|
||||||
mkts_dt = np.dtype(_ohlcv_dt)
|
# for the same timestamp-index.
|
||||||
mkts_array = np.zeros(
|
# TODO: pre deduplicate?
|
||||||
len(to_append),
|
isvariablelength=True,
|
||||||
dtype=mkts_dt,
|
)
|
||||||
)
|
|
||||||
# copy from shm array (yes it's this easy):
|
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
|
|
||||||
mkts_array[:] = to_append[[
|
|
||||||
'time',
|
|
||||||
'open',
|
|
||||||
'high',
|
|
||||||
'low',
|
|
||||||
'close',
|
|
||||||
'volume',
|
|
||||||
]]
|
|
||||||
|
|
||||||
# write to db
|
log.info(
|
||||||
resp = await client.write(
|
f'Wrote {to_append.size} datums to tsdb\n'
|
||||||
mkts_array,
|
)
|
||||||
tbk=tbk,
|
profiler('Finished db writes')
|
||||||
# NOTE: will will append duplicates
|
|
||||||
# for the same timestamp-index.
|
|
||||||
isvariablelength=True,
|
|
||||||
)
|
|
||||||
end_write = time.time()
|
|
||||||
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
|
|
||||||
log.info(
|
|
||||||
f'Wrote {to_append.size} datums to tsdb\n'
|
|
||||||
f'Total write time: {diff_ms} ms'
|
|
||||||
)
|
|
||||||
for resp in resp.responses:
|
|
||||||
err = resp.error
|
|
||||||
if err:
|
|
||||||
raise MarketStoreError(err)
|
|
||||||
|
|
||||||
# TODO: backfiller loop
|
for resp in resp.responses:
|
||||||
from piker.ui._compression import downsample
|
err = resp.error
|
||||||
x, y = downsample(
|
if err:
|
||||||
s1['Epoch'],
|
raise MarketStoreError(err)
|
||||||
s1['Close'],
|
|
||||||
bins=10,
|
|
||||||
)
|
|
||||||
await tractor.breakpoint()
|
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
|
|
Loading…
Reference in New Issue