Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet c3553de1f2 Handle non-fqsn for derivs and don't put brokername in 2022-04-13 08:11:57 -04:00
Tyler Goodlet 0790ae8235 Limit ohlc queries to 800k datums to avoid `purepc` size error 2022-04-13 08:11:57 -04:00
Tyler Goodlet a6b8c03e0e Get sync-to-marketstore-tsdb history retrieval workinnn 2022-04-13 08:11:57 -04:00
Tyler Goodlet ac22190b60 Handle "fatal" level log msgs in docker super 2022-04-13 08:11:57 -04:00
Tyler Goodlet 75d0bf3152 Add basic tsdb history loading
If `marketstore` is detected try to only load most recent missing data
from the data provider (broker) and the rest from the tsdb and push it
all to shm for display in the UI. If the provider/broker doesn't have
the history client endpoint, just use the old one for now so we can
start to incrementally add support. Don't start the ohlc step
incrementer task until the backend signals that the feed is live.
2022-04-13 08:11:57 -04:00
Tyler Goodlet 85e2602d2e Drop `ms-shell`, add `piker storesh` cmd 2022-04-13 08:11:57 -04:00
Tyler Goodlet edcac1e768 Add diffing logic to `tsdb_history_update()`
Add some basic `numpy` epoch slice logic to generate append and prepend
arrays to write to the db.

Mooar cool things,
- add a `Storage.delete_ts()` method to wipe a column series from the db
  easily.
- don't attempt to read in any OHLC series by default on client load
- add some `pyqtgraph` profiling and drop manual latency measures
- if no db series for the fqsn exists write the entire shm array
2022-04-13 08:11:57 -04:00
Tyler Goodlet 73b8719984 Drop `pandas` to `numpy` converter 2022-04-13 08:11:57 -04:00
Tyler Goodlet 6b17370711 Disable re-connect for now in ib script 2022-04-13 08:11:57 -04:00
Tyler Goodlet cd3dbc9275 Ensure bfqsn is lower cased for feed api consumers
Also, Start tinkering with `tractor.trionics.ipython_embed()`

In effort to get back to a usable REPL around the mkts client
this adds usage of the new `tractor` integration api as well as logic
for skipping backfilling if existing tsdb arrays are found.
2022-04-13 08:11:57 -04:00
6 changed files with 309 additions and 314 deletions

View File

@ -24,7 +24,6 @@ from typing import (
# Any, # Any,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
# import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -97,6 +96,7 @@ async def open_docker(
base_url=url, base_url=url,
**kwargs **kwargs
) if url else docker.from_env(**kwargs) ) if url else docker.from_env(**kwargs)
yield client yield client
except ( except (
@ -127,43 +127,10 @@ async def open_docker(
finally: finally:
if client: if client:
# for c in client.containers.list():
# c.kill()
client.close() client.close()
# client.api._custom_adapter.close() # client.api._custom_adapter.close()
for c in client.containers.list():
c.kill()
# async def waitfor(
# cntr: Container,
# attr_path: tuple[str],
# expect=None,
# timeout: float = 0.5,
# ) -> Any:
# '''
# Wait for a container's attr value to be set. If ``expect`` is
# provided wait for the value to be set to that value.
# This is an async version of the helper from our ``pytest-dockerctl``
# plugin.
# '''
# def get(val, path):
# for key in path:
# val = val[key]
# return val
# start = time.time()
# while time.time() - start < timeout:
# cntr.reload()
# val = get(cntr.attrs, attr_path)
# if expect is None and val:
# return val
# elif val == expect:
# return val
# else:
# raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
# attr_path, expect if expect else 'not None', val))
@tractor.context @tractor.context
@ -220,7 +187,7 @@ async def open_marketstored(
}, },
mounts=[config_dir_mnt, data_dir_mnt], mounts=[config_dir_mnt, data_dir_mnt],
detach=True, detach=True,
stop_signal='SIGINT', # stop_signal='SIGINT',
init=True, init=True,
# remove=True, # remove=True,
) )
@ -247,7 +214,7 @@ async def open_marketstored(
seen_so_far.add(entry) seen_so_far.add(entry)
if bp_on_msg: if bp_on_msg:
await tractor.breakpoint() await tractor.breakpoint()
getattr(log, level)(f'{msg}') getattr(log, level, log.error)(f'{msg}')
# if "launching tcp listener for all services..." in msg: # if "launching tcp listener for all services..." in msg:
if match in msg: if match in msg:
@ -269,6 +236,8 @@ async def open_marketstored(
) )
await ctx.started(cntr.id) await ctx.started(cntr.id)
# block for the expected "teardown log msg"..
await process_logs_until('exiting...',) await process_logs_until('exiting...',)
except ( except (

View File

@ -22,8 +22,7 @@ from typing import Any
import decimal import decimal
import numpy as np import numpy as np
import pandas as pd from pydantic import BaseModel
from pydantic import BaseModel, validate_arguments
# from numba import from_dtype # from numba import from_dtype
@ -254,61 +253,6 @@ class Symbol(BaseModel):
return keys return keys
def from_df(
df: pd.DataFrame,
source=None,
default_tf=None
) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
"""
df.reset_index(inplace=True)
# hackery to convert field names
date = 'Date'
if 'date' in df.columns:
date = 'date'
# convert to POSIX time
df[date] = [d.timestamp() for d in df[date]]
# try to rename from some camel case
columns = {
'Date': 'time',
'date': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
# if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
array = df.to_records(index=False)
_nan_to_closest_num(array)
return array
def _nan_to_closest_num(array: np.ndarray): def _nan_to_closest_num(array: np.ndarray):
"""Return interpolated values instead of NaN. """Return interpolated values instead of NaN.

View File

@ -126,7 +126,7 @@ def ms_stream(
) )
@click.argument('symbols', nargs=-1) @click.argument('symbols', nargs=-1)
@click.pass_obj @click.pass_obj
def ms_shell( def storesh(
config, config,
tl, tl,
host, host,
@ -137,43 +137,18 @@ def ms_shell(
Start an IPython shell ready to query the local marketstore db. Start an IPython shell ready to query the local marketstore db.
''' '''
from piker.data.marketstore import backfill_history_diff from piker.data.marketstore import tsdb_history_update
from piker._daemon import open_piker_runtime from piker._daemon import open_piker_runtime
async def main(): async def main():
nonlocal symbols nonlocal symbols
async with open_piker_runtime( async with open_piker_runtime(
'ms_shell', 'storesh',
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
try: symbol = symbols[0]
await backfill_history_diff() await tsdb_history_update(symbol)
except OSError:
# TODO: write magics to query marketstore
sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
trio.run(main) trio.run(main)
@ -184,10 +159,11 @@ def ms_shell(
@click.argument('name', nargs=1, required=True) @click.argument('name', nargs=1, required=True)
@click.pass_obj @click.pass_obj
def ingest(config, name, test_file, tl): def ingest(config, name, test_file, tl):
"""Ingest real-time broker quotes and ticks to a marketstore instance. '''
""" Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts # global opts
brokermods = config['brokermods']
loglevel = config['loglevel'] loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
# log = config['log'] # log = config['log']

View File

@ -22,7 +22,6 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
@ -32,11 +31,14 @@ from typing import (
Awaitable, Awaitable,
) )
import pendulum
import trio import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pydantic import BaseModel from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_context from .._cacheables import maybe_open_context
@ -49,7 +51,6 @@ from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array, attach_shm_array,
ShmArray, ShmArray,
_secs_in_day,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from ._source import ( from ._source import (
@ -192,6 +193,22 @@ async def _setup_persistent_brokerd(
await trio.sleep_forever() await trio.sleep_forever()
async def start_backfill(
mod: ModuleType,
fqsn: str,
shm: ShmArray,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> int:
return await mod.backfill_bars(
fqsn,
shm,
task_status=task_status,
)
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
bus: _FeedsBus, bus: _FeedsBus,
@ -220,104 +237,150 @@ async def manage_history(
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=False, readonly=False,
) )
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored') is_up = await check_for_service('marketstored')
if is_up and opened:
# for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client:
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
from . import marketstore from . import marketstore
async with marketstore.open_storage_client( async with marketstore.open_storage_client(
fqsn, fqsn,
) as (storage, tsdb_arrays): ) as storage:
# TODO: history validation tsdb_arrays = await storage.read_ohlcv(fqsn)
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
if tsdb_arrays: if not tsdb_arrays:
do_legacy_backfill = True
else:
log.info(f'Loaded tsdb history {tsdb_arrays}') log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
last_s = fastest['Epoch'][-1]
# TODO: see if there's faster multi-field reads: fastest = list(tsdb_arrays.values())[0]
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields times = fastest['Epoch']
first, last = times[0], times[-1]
# re-index with a `time` and index field first_tsdb_dt, last_tsdb_dt = map(
shm.push( pendulum.from_timestamp, [first, last]
fastest[-3 * _secs_in_day:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
) )
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
async with mod.open_history_client(fqsn) as hist: async with open_history_client(fqsn) as hist:
# get latest query's worth of history # get latest query's worth of history all the way
array, next_dt = await hist(end_dt='') # back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
last_dt = datetime.fromtimestamp(last_s) # let caller unblock and deliver latest history frame
array, next_dt = await hist(end_dt=last_dt) task_status.started(shm)
some_data_ready.set()
some_data_ready.set() # pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
elif opened: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-shm._first.value:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# TODO: write new data to tsdb to be ready to for next
# read.
if do_legacy_backfill:
# do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..') log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) await bus.nursery.start(
start_backfill,
mod,
bfqsn,
shm,
)
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(shm) task_status.started(shm)
# indicate to caller that feed can be delivered to # indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history # remote requesting client since we've loaded history
# data that can be used. # data that can be used.
some_data_ready.set() some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever() await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
start_stream: bool = True, start_stream: bool = True,
@ -337,6 +400,7 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec - a real-time streaming task which connec
''' '''
# load backend module
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
except ImportError: except ImportError:
@ -361,8 +425,10 @@ async def allocate_persistent_feed(
loglevel=loglevel, loglevel=loglevel,
) )
) )
# the broker-specific fully qualified symbol name # the broker-specific fully qualified symbol name,
bfqsn = init_msg[symbol]['fqsn'] # but ensure it is lower-cased for external use.
bfqsn = init_msg[symbol]['fqsn'].lower()
init_msg[symbol]['fqsn'] = bfqsn
# HISTORY, run 2 tasks: # HISTORY, run 2 tasks:
# - a history loader / maintainer # - a history loader / maintainer
@ -377,7 +443,7 @@ async def allocate_persistent_feed(
manage_history, manage_history,
mod, mod,
bus, bus,
bfqsn, '.'.join((bfqsn, brokername)),
some_data_ready, some_data_ready,
feed_is_live, feed_is_live,
) )
@ -390,9 +456,11 @@ async def allocate_persistent_feed(
# true fqsn # true fqsn
fqsn = '.'.join((bfqsn, brokername)) fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix # add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as # TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that # the fastest "sample period" since we'll probably always want that
@ -406,13 +474,14 @@ async def allocate_persistent_feed(
await some_data_ready.wait() await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol # append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}' acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
generic_first_quotes = { generic_first_quotes = {
bsym: first_quote, acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote, fqsn: first_quote,
} }
bus.feeds[symbol] = bus.feeds[fqsn] = ( bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg, init_msg,
generic_first_quotes, generic_first_quotes,
) )
@ -425,9 +494,22 @@ async def allocate_persistent_feed(
if not start_stream: if not start_stream:
await trio.sleep_forever() await trio.sleep_forever()
# backend will indicate when real-time quotes have begun. # begin real-time updates of shm and tsb once the feed goes live and
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling
# at the current detected step period.
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
@ -450,7 +532,7 @@ async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbol: str, symbol: str, # normally expected to the broker-specific fqsn
loglevel: str, loglevel: str,
tick_throttle: Optional[float] = None, tick_throttle: Optional[float] = None,
start_stream: bool = True, start_stream: bool = True,
@ -472,7 +554,9 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol # TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir). # (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are # ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
@ -482,7 +566,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol) entry = bus.feeds.get(symbol)
if entry is None: if entry is None:
# allocate a new actor-local stream bus which # allocate a new actor-local stream bus which
# will persist for this `brokerd`. # will persist for this `brokerd`'s service lifetime.
async with bus.task_lock: async with bus.task_lock:
await bus.nursery.start( await bus.nursery.start(
partial( partial(
@ -506,12 +590,12 @@ async def open_feed_bus(
init_msg, first_quotes = bus.feeds[symbol] init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol] msg = init_msg[symbol]
bfqsn = msg['fqsn'] bfqsn = msg['fqsn'].lower()
# true fqsn # true fqsn
fqsn = '.'.join([bfqsn, brokername]) fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes assert fqsn in first_quotes
assert bus.feeds[fqsn] assert bus.feeds[bfqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}' bsym = symbol + f'.{brokername}'
@ -825,7 +909,10 @@ async def maybe_open_feed(
**kwargs, **kwargs,
) -> (Feed, ReceiveChannel[dict[str, Any]]): ) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
''' '''
Maybe open a data to a ``brokerd`` daemon only if there is no Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
@ -846,6 +933,7 @@ async def maybe_open_feed(
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=fqsn, key=fqsn,
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:

View File

@ -29,14 +29,13 @@ from typing import (
Any, Any,
Optional, Optional,
Union, Union,
# Callable,
# TYPE_CHECKING,
) )
import time import time
from math import isnan from math import isnan
from bidict import bidict from bidict import bidict
import msgpack import msgpack
import pyqtgraph as pg
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import tractor import tractor
@ -49,15 +48,8 @@ from anyio_marketstore import (
import purerpc import purerpc
from .feed import maybe_open_feed from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__) log = get_logger(__name__)
@ -235,6 +227,16 @@ class Storage:
# series' cache from tsdb reads # series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {} self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None: async def write_ticks(self, ticks: list) -> None:
... ...
@ -256,14 +258,29 @@ class Storage:
if fqsn not in syms: if fqsn not in syms:
return {} return {}
tfstr = tf_in_1s[1]
params = Params(
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
)
if timeframe is None: if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..') log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity # loop through and try to find highest granularity
for tfstr in tf_in_1s.values(): for tfstr in tf_in_1s.values():
try: try:
log.info(f'querying for {tfstr}@{fqsn}') log.info(f'querying for {tfstr}@{fqsn}')
result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) params.set('timeframe', tfstr)
result = await client.query(params)
break break
except purerpc.grpclib.exceptions.UnknownError: except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and # XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay. # thus shows up through `marketstored` logs relay.
@ -273,9 +290,11 @@ class Storage:
return {} return {}
else: else:
tfstr = tf_in_1s[timeframe] result = await client.query(params)
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map # Fill out a `numpy` array-results map
arrays = {} arrays = {}
for fqsn, data_set in result.by_symbols().items(): for fqsn, data_set in result.by_symbols().items():
@ -283,7 +302,22 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe] tf_in_1s.inverse[data_set.timeframe]
] = data_set.array ] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
@acm @acm
@ -296,19 +330,16 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format. Load a series by key and deliver in ``numpy`` struct array format.
''' '''
async with get_client() as client: async with (
# eventually a storage backend endpoint
storage_client = Storage(client) get_client() as client,
arrays = await storage_client.read_ohlcv( ):
fqsn, # slap on our wrapper api
period, yield Storage(client)
)
yield storage_client, arrays
async def backfill_history_diff( async def tsdb_history_update(
# symbol: Symbol fqsn: str,
) -> list[str]: ) -> list[str]:
@ -338,108 +369,95 @@ async def backfill_history_diff(
# * the original data feed arch blurb: # * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# #
profiler = pg.debug.Profiler(
broker = 'ib' disabled=False, # not pg_profile_enabled(),
symbol = 'mnq.globex' delayed=False,
)
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
async with ( async with (
get_client() as client, open_storage_client(fqsn) as storage,
maybe_open_feed( maybe_open_feed(
broker, [fqsn],
[symbol],
loglevel='info',
# backpressure=False,
start_stream=False, start_stream=False,
) as (feed, stream), ) as (feed, stream),
): ):
syms = await client.list_symbols() profiler(f'opened feed for {fqsn}')
log.info(f'Existing symbol set:\n{pformat(syms)}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions # diff db history with shm and only write the missing portions
ohlcv = feed.shm.array ohlcv = feed.shm.array
key = (fqsn, '1Sec', 'OHLCV') # TODO: use pg profiler
tbk = mk_tbk(key) tsdb_arrays = await storage.read_ohlcv(fqsn)
# diff vs. existing array and append new history to_append = feed.shm.array
# TODO: to_prepend = None
# TODO: should be no error? # hist diffing
# assert not resp.responses if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
start = time.time() profiler('Finished db arrays diffs')
qr = await client.query( for array in [to_append, to_prepend]:
# Params(fqsn, '1Sec`', 'OHLCV',) if array is None:
Params(*key), continue
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
s1 = arrays[(fqsn, 1)] log.info(
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]] f'Writing datums {array.size} -> to tsdb from shm\n'
)
end_diff = time.time() # build mkts schema compat array for writing
diff_ms = round((end_diff - start) * 1e3, ndigits=2) mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(array),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = array[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
log.info( # write to db
f'Appending {to_append.size} datums to tsdb from shm\n' resp = await storage.client.write(
f'Total diff time: {diff_ms} ms' mkts_array,
) tbk=f'{fqsn}/1Sec/OHLCV',
# build mkts schema compat array for writing # NOTE: will will append duplicates
mkts_dt = np.dtype(_ohlcv_dt) # for the same timestamp-index.
mkts_array = np.zeros( # TODO: pre deduplicate?
len(to_append), isvariablelength=True,
dtype=mkts_dt, )
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = to_append[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
# write to db log.info(
resp = await client.write( f'Wrote {to_append.size} datums to tsdb\n'
mkts_array, )
tbk=tbk, profiler('Finished db writes')
# NOTE: will will append duplicates
# for the same timestamp-index.
isvariablelength=True,
)
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
# TODO: backfiller loop for resp in resp.responses:
from piker.ui._compression import downsample err = resp.error
x, y = downsample( if err:
s1['Epoch'], raise MarketStoreError(err)
s1['Close'],
bins=10, from tractor.trionics import ipython_embed
) await ipython_embed()
await tractor.breakpoint()
async def ingest_quote_stream( async def ingest_quote_stream(

View File

@ -54,7 +54,7 @@ for name in win_names:
# disconnect? # disconnect?
for key_combo, timeout in [ for key_combo, timeout in [
# only required if we need a connection reset. # only required if we need a connection reset.
('ctrl+alt+r', 12), # ('ctrl+alt+r', 12),
# data feed reset. # data feed reset.
('ctrl+alt+f', 6) ('ctrl+alt+f', 6)
]: ]: