Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet c3553de1f2 Handle non-fqsn for derivs and don't put brokername in 2022-04-13 08:11:57 -04:00
Tyler Goodlet 0790ae8235 Limit ohlc queries to 800k datums to avoid `purepc` size error 2022-04-13 08:11:57 -04:00
Tyler Goodlet a6b8c03e0e Get sync-to-marketstore-tsdb history retrieval workinnn 2022-04-13 08:11:57 -04:00
Tyler Goodlet ac22190b60 Handle "fatal" level log msgs in docker super 2022-04-13 08:11:57 -04:00
Tyler Goodlet 75d0bf3152 Add basic tsdb history loading
If `marketstore` is detected try to only load most recent missing data
from the data provider (broker) and the rest from the tsdb and push it
all to shm for display in the UI. If the provider/broker doesn't have
the history client endpoint, just use the old one for now so we can
start to incrementally add support. Don't start the ohlc step
incrementer task until the backend signals that the feed is live.
2022-04-13 08:11:57 -04:00
Tyler Goodlet 85e2602d2e Drop `ms-shell`, add `piker storesh` cmd 2022-04-13 08:11:57 -04:00
Tyler Goodlet edcac1e768 Add diffing logic to `tsdb_history_update()`
Add some basic `numpy` epoch slice logic to generate append and prepend
arrays to write to the db.

Mooar cool things,
- add a `Storage.delete_ts()` method to wipe a column series from the db
  easily.
- don't attempt to read in any OHLC series by default on client load
- add some `pyqtgraph` profiling and drop manual latency measures
- if no db series for the fqsn exists write the entire shm array
2022-04-13 08:11:57 -04:00
Tyler Goodlet 73b8719984 Drop `pandas` to `numpy` converter 2022-04-13 08:11:57 -04:00
Tyler Goodlet 6b17370711 Disable re-connect for now in ib script 2022-04-13 08:11:57 -04:00
Tyler Goodlet cd3dbc9275 Ensure bfqsn is lower cased for feed api consumers
Also, Start tinkering with `tractor.trionics.ipython_embed()`

In effort to get back to a usable REPL around the mkts client
this adds usage of the new `tractor` integration api as well as logic
for skipping backfilling if existing tsdb arrays are found.
2022-04-13 08:11:57 -04:00
6 changed files with 309 additions and 314 deletions

View File

@ -24,7 +24,6 @@ from typing import (
# Any,
)
from contextlib import asynccontextmanager as acm
# import time
import trio
from trio_typing import TaskStatus
@ -97,6 +96,7 @@ async def open_docker(
base_url=url,
**kwargs
) if url else docker.from_env(**kwargs)
yield client
except (
@ -127,43 +127,10 @@ async def open_docker(
finally:
if client:
# for c in client.containers.list():
# c.kill()
client.close()
# client.api._custom_adapter.close()
# async def waitfor(
# cntr: Container,
# attr_path: tuple[str],
# expect=None,
# timeout: float = 0.5,
# ) -> Any:
# '''
# Wait for a container's attr value to be set. If ``expect`` is
# provided wait for the value to be set to that value.
# This is an async version of the helper from our ``pytest-dockerctl``
# plugin.
# '''
# def get(val, path):
# for key in path:
# val = val[key]
# return val
# start = time.time()
# while time.time() - start < timeout:
# cntr.reload()
# val = get(cntr.attrs, attr_path)
# if expect is None and val:
# return val
# elif val == expect:
# return val
# else:
# raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
# attr_path, expect if expect else 'not None', val))
for c in client.containers.list():
c.kill()
@tractor.context
@ -220,7 +187,7 @@ async def open_marketstored(
},
mounts=[config_dir_mnt, data_dir_mnt],
detach=True,
stop_signal='SIGINT',
# stop_signal='SIGINT',
init=True,
# remove=True,
)
@ -247,7 +214,7 @@ async def open_marketstored(
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level)(f'{msg}')
getattr(log, level, log.error)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
@ -269,6 +236,8 @@ async def open_marketstored(
)
await ctx.started(cntr.id)
# block for the expected "teardown log msg"..
await process_logs_until('exiting...',)
except (

View File

@ -22,8 +22,7 @@ from typing import Any
import decimal
import numpy as np
import pandas as pd
from pydantic import BaseModel, validate_arguments
from pydantic import BaseModel
# from numba import from_dtype
@ -254,61 +253,6 @@ class Symbol(BaseModel):
return keys
def from_df(
df: pd.DataFrame,
source=None,
default_tf=None
) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
"""
df.reset_index(inplace=True)
# hackery to convert field names
date = 'Date'
if 'date' in df.columns:
date = 'date'
# convert to POSIX time
df[date] = [d.timestamp() for d in df[date]]
# try to rename from some camel case
columns = {
'Date': 'time',
'date': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
# if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
array = df.to_records(index=False)
_nan_to_closest_num(array)
return array
def _nan_to_closest_num(array: np.ndarray):
"""Return interpolated values instead of NaN.

View File

@ -126,7 +126,7 @@ def ms_stream(
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def ms_shell(
def storesh(
config,
tl,
host,
@ -137,43 +137,18 @@ def ms_shell(
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import backfill_history_diff
from piker.data.marketstore import tsdb_history_update
from piker._daemon import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'ms_shell',
'storesh',
enable_modules=['piker.data._ahab'],
):
try:
await backfill_history_diff()
except OSError:
# TODO: write magics to query marketstore
sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
symbol = symbols[0]
await tsdb_history_update(symbol)
trio.run(main)
@ -184,10 +159,11 @@ def ms_shell(
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts
brokermods = config['brokermods']
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']

View File

@ -22,7 +22,6 @@ This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager
from functools import partial
from types import ModuleType
@ -32,11 +31,14 @@ from typing import (
Awaitable,
)
import pendulum
import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod
from .._cacheables import maybe_open_context
@ -49,7 +51,6 @@ from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
ShmArray,
_secs_in_day,
)
from .ingest import get_ingestormod
from ._source import (
@ -192,6 +193,22 @@ async def _setup_persistent_brokerd(
await trio.sleep_forever()
async def start_backfill(
mod: ModuleType,
fqsn: str,
shm: ShmArray,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> int:
return await mod.backfill_bars(
fqsn,
shm,
task_status=task_status,
)
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
@ -220,104 +237,150 @@ async def manage_history(
# we expect the sub-actor to write
readonly=False,
)
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored')
if is_up and opened:
# for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client:
log.info('Found existing `marketstored`')
from . import marketstore
async with marketstore.open_storage_client(
fqsn,
) as (storage, tsdb_arrays):
) as storage:
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
tsdb_arrays = await storage.read_ohlcv(fqsn)
if tsdb_arrays:
if not tsdb_arrays:
do_legacy_backfill = True
else:
log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
last_s = fastest['Epoch'][-1]
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-3 * _secs_in_day:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
)
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
# start history anal and load missing new data via backend.
async with mod.open_history_client(fqsn) as hist:
async with open_history_client(fqsn) as hist:
# get latest query's worth of history
array, next_dt = await hist(end_dt='')
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
last_dt = datetime.fromtimestamp(last_s)
array, next_dt = await hist(end_dt=last_dt)
# let caller unblock and deliver latest history frame
task_status.started(shm)
some_data_ready.set()
some_data_ready.set()
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
elif opened:
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-shm._first.value:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# TODO: write new data to tsdb to be ready to for next
# read.
if do_legacy_backfill:
# do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
await bus.nursery.start(
start_backfill,
mod,
bfqsn,
shm,
)
# yield back after client connect with filled shm
task_status.started(shm)
# yield back after client connect with filled shm
task_status.started(shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed(
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
start_stream: bool = True,
@ -337,6 +400,7 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec
'''
# load backend module
try:
mod = get_brokermod(brokername)
except ImportError:
@ -361,8 +425,10 @@ async def allocate_persistent_feed(
loglevel=loglevel,
)
)
# the broker-specific fully qualified symbol name
bfqsn = init_msg[symbol]['fqsn']
# the broker-specific fully qualified symbol name,
# but ensure it is lower-cased for external use.
bfqsn = init_msg[symbol]['fqsn'].lower()
init_msg[symbol]['fqsn'] = bfqsn
# HISTORY, run 2 tasks:
# - a history loader / maintainer
@ -377,7 +443,7 @@ async def allocate_persistent_feed(
manage_history,
mod,
bus,
bfqsn,
'.'.join((bfqsn, brokername)),
some_data_ready,
feed_is_live,
)
@ -390,9 +456,11 @@ async def allocate_persistent_feed(
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
@ -406,13 +474,14 @@ async def allocate_persistent_feed(
await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}'
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
generic_first_quotes = {
bsym: first_quote,
acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote,
}
bus.feeds[symbol] = bus.feeds[fqsn] = (
bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg,
generic_first_quotes,
)
@ -425,9 +494,22 @@ async def allocate_persistent_feed(
if not start_stream:
await trio.sleep_forever()
# backend will indicate when real-time quotes have begun.
# begin real-time updates of shm and tsb once the feed goes live and
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling
# at the current detected step period.
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
@ -450,7 +532,7 @@ async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
@ -472,7 +554,9 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername)
@ -482,7 +566,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`.
# will persist for this `brokerd`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
@ -506,12 +590,12 @@ async def open_feed_bus(
init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol]
bfqsn = msg['fqsn']
bfqsn = msg['fqsn'].lower()
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[fqsn]
assert bus.feeds[bfqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'
@ -825,7 +909,10 @@ async def maybe_open_feed(
**kwargs,
) -> (Feed, ReceiveChannel[dict[str, Any]]):
) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
@ -846,6 +933,7 @@ async def maybe_open_feed(
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,
) as (cache_hit, feed):
if cache_hit:

View File

@ -29,14 +29,13 @@ from typing import (
Any,
Optional,
Union,
# Callable,
# TYPE_CHECKING,
)
import time
from math import isnan
from bidict import bidict
import msgpack
import pyqtgraph as pg
import numpy as np
import pandas as pd
import tractor
@ -49,15 +48,8 @@ from anyio_marketstore import (
import purerpc
from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__)
@ -235,6 +227,16 @@ class Storage:
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None:
...
@ -256,14 +258,29 @@ class Storage:
if fqsn not in syms:
return {}
tfstr = tf_in_1s[1]
params = Params(
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
)
if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity
for tfstr in tf_in_1s.values():
try:
log.info(f'querying for {tfstr}@{fqsn}')
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
params.set('timeframe', tfstr)
result = await client.query(params)
break
except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay.
@ -273,9 +290,11 @@ class Storage:
return {}
else:
tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
result = await client.query(params)
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
@ -283,7 +302,22 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
@acm
@ -296,19 +330,16 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format.
'''
async with get_client() as client:
storage_client = Storage(client)
arrays = await storage_client.read_ohlcv(
fqsn,
period,
)
yield storage_client, arrays
async with (
# eventually a storage backend endpoint
get_client() as client,
):
# slap on our wrapper api
yield Storage(client)
async def backfill_history_diff(
# symbol: Symbol
async def tsdb_history_update(
fqsn: str,
) -> list[str]:
@ -338,108 +369,95 @@ async def backfill_history_diff(
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
broker = 'ib'
symbol = 'mnq.globex'
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
profiler = pg.debug.Profiler(
disabled=False, # not pg_profile_enabled(),
delayed=False,
)
async with (
get_client() as client,
open_storage_client(fqsn) as storage,
maybe_open_feed(
broker,
[symbol],
loglevel='info',
# backpressure=False,
[fqsn],
start_stream=False,
) as (feed, stream),
):
syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
profiler(f'opened feed for {fqsn}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
key = (fqsn, '1Sec', 'OHLCV')
tbk = mk_tbk(key)
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
# diff vs. existing array and append new history
# TODO:
to_append = feed.shm.array
to_prepend = None
# TODO: should be no error?
# assert not resp.responses
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
start = time.time()
profiler('Finished db arrays diffs')
qr = await client.query(
# Params(fqsn, '1Sec`', 'OHLCV',)
Params(*key),
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
for array in [to_append, to_prepend]:
if array is None:
continue
s1 = arrays[(fqsn, 1)]
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
log.info(
f'Writing datums {array.size} -> to tsdb from shm\n'
)
end_diff = time.time()
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(array),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = array[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
log.info(
f'Appending {to_append.size} datums to tsdb from shm\n'
f'Total diff time: {diff_ms} ms'
)
# write to db
resp = await storage.client.write(
mkts_array,
tbk=f'{fqsn}/1Sec/OHLCV',
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(to_append),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = to_append[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True,
)
# write to db
resp = await client.write(
mkts_array,
tbk=tbk,
# NOTE: will will append duplicates
# for the same timestamp-index.
isvariablelength=True,
)
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
)
profiler('Finished db writes')
# TODO: backfiller loop
from piker.ui._compression import downsample
x, y = downsample(
s1['Epoch'],
s1['Close'],
bins=10,
)
await tractor.breakpoint()
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
from tractor.trionics import ipython_embed
await ipython_embed()
async def ingest_quote_stream(

View File

@ -54,7 +54,7 @@ for name in win_names:
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
('ctrl+alt+r', 12),
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]: