Compare commits

..

No commits in common. "c3553de1f259c219e5a3093405a5f2fdcb731372" and "87fab6c5b1b5d51f8513eec4804e8f0871e763a5" have entirely different histories.

6 changed files with 314 additions and 309 deletions

View File

@ -24,6 +24,7 @@ from typing import (
# Any, # Any,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
# import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -96,7 +97,6 @@ async def open_docker(
base_url=url, base_url=url,
**kwargs **kwargs
) if url else docker.from_env(**kwargs) ) if url else docker.from_env(**kwargs)
yield client yield client
except ( except (
@ -127,10 +127,43 @@ async def open_docker(
finally: finally:
if client: if client:
# for c in client.containers.list():
# c.kill()
client.close() client.close()
# client.api._custom_adapter.close() # client.api._custom_adapter.close()
for c in client.containers.list():
c.kill()
# async def waitfor(
# cntr: Container,
# attr_path: tuple[str],
# expect=None,
# timeout: float = 0.5,
# ) -> Any:
# '''
# Wait for a container's attr value to be set. If ``expect`` is
# provided wait for the value to be set to that value.
# This is an async version of the helper from our ``pytest-dockerctl``
# plugin.
# '''
# def get(val, path):
# for key in path:
# val = val[key]
# return val
# start = time.time()
# while time.time() - start < timeout:
# cntr.reload()
# val = get(cntr.attrs, attr_path)
# if expect is None and val:
# return val
# elif val == expect:
# return val
# else:
# raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
# attr_path, expect if expect else 'not None', val))
@tractor.context @tractor.context
@ -187,7 +220,7 @@ async def open_marketstored(
}, },
mounts=[config_dir_mnt, data_dir_mnt], mounts=[config_dir_mnt, data_dir_mnt],
detach=True, detach=True,
# stop_signal='SIGINT', stop_signal='SIGINT',
init=True, init=True,
# remove=True, # remove=True,
) )
@ -214,7 +247,7 @@ async def open_marketstored(
seen_so_far.add(entry) seen_so_far.add(entry)
if bp_on_msg: if bp_on_msg:
await tractor.breakpoint() await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}') getattr(log, level)(f'{msg}')
# if "launching tcp listener for all services..." in msg: # if "launching tcp listener for all services..." in msg:
if match in msg: if match in msg:
@ -236,8 +269,6 @@ async def open_marketstored(
) )
await ctx.started(cntr.id) await ctx.started(cntr.id)
# block for the expected "teardown log msg"..
await process_logs_until('exiting...',) await process_logs_until('exiting...',)
except ( except (

View File

@ -22,7 +22,8 @@ from typing import Any
import decimal import decimal
import numpy as np import numpy as np
from pydantic import BaseModel import pandas as pd
from pydantic import BaseModel, validate_arguments
# from numba import from_dtype # from numba import from_dtype
@ -253,6 +254,61 @@ class Symbol(BaseModel):
return keys return keys
def from_df(
df: pd.DataFrame,
source=None,
default_tf=None
) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
"""
df.reset_index(inplace=True)
# hackery to convert field names
date = 'Date'
if 'date' in df.columns:
date = 'date'
# convert to POSIX time
df[date] = [d.timestamp() for d in df[date]]
# try to rename from some camel case
columns = {
'Date': 'time',
'date': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
# if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
array = df.to_records(index=False)
_nan_to_closest_num(array)
return array
def _nan_to_closest_num(array: np.ndarray): def _nan_to_closest_num(array: np.ndarray):
"""Return interpolated values instead of NaN. """Return interpolated values instead of NaN.

View File

@ -126,7 +126,7 @@ def ms_stream(
) )
@click.argument('symbols', nargs=-1) @click.argument('symbols', nargs=-1)
@click.pass_obj @click.pass_obj
def storesh( def ms_shell(
config, config,
tl, tl,
host, host,
@ -137,18 +137,43 @@ def storesh(
Start an IPython shell ready to query the local marketstore db. Start an IPython shell ready to query the local marketstore db.
''' '''
from piker.data.marketstore import tsdb_history_update from piker.data.marketstore import backfill_history_diff
from piker._daemon import open_piker_runtime from piker._daemon import open_piker_runtime
async def main(): async def main():
nonlocal symbols nonlocal symbols
async with open_piker_runtime( async with open_piker_runtime(
'storesh', 'ms_shell',
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
symbol = symbols[0] try:
await tsdb_history_update(symbol) await backfill_history_diff()
except OSError:
# TODO: write magics to query marketstore
sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
trio.run(main) trio.run(main)
@ -159,11 +184,10 @@ def storesh(
@click.argument('name', nargs=1, required=True) @click.argument('name', nargs=1, required=True)
@click.pass_obj @click.pass_obj
def ingest(config, name, test_file, tl): def ingest(config, name, test_file, tl):
''' """Ingest real-time broker quotes and ticks to a marketstore instance.
Ingest real-time broker quotes and ticks to a marketstore instance. """
'''
# global opts # global opts
brokermods = config['brokermods']
loglevel = config['loglevel'] loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
# log = config['log'] # log = config['log']

View File

@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
@ -31,14 +32,11 @@ from typing import (
Awaitable, Awaitable,
) )
import pendulum
import trio import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pydantic import BaseModel from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_context from .._cacheables import maybe_open_context
@ -51,6 +49,7 @@ from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array, attach_shm_array,
ShmArray, ShmArray,
_secs_in_day,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from ._source import ( from ._source import (
@ -193,22 +192,6 @@ async def _setup_persistent_brokerd(
await trio.sleep_forever() await trio.sleep_forever()
async def start_backfill(
mod: ModuleType,
fqsn: str,
shm: ShmArray,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> int:
return await mod.backfill_bars(
fqsn,
shm,
task_status=task_status,
)
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
bus: _FeedsBus, bus: _FeedsBus,
@ -237,150 +220,104 @@ async def manage_history(
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=False, readonly=False,
) )
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored') is_up = await check_for_service('marketstored')
if is_up and opened:
# for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client:
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
from . import marketstore from . import marketstore
async with marketstore.open_storage_client( async with marketstore.open_storage_client(
fqsn, fqsn,
) as storage: ) as (storage, tsdb_arrays):
tsdb_arrays = await storage.read_ohlcv(fqsn) # TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
if not tsdb_arrays: if tsdb_arrays:
do_legacy_backfill = True
else:
log.info(f'Loaded tsdb history {tsdb_arrays}') log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
last_s = fastest['Epoch'][-1]
fastest = list(tsdb_arrays.values())[0] # TODO: see if there's faster multi-field reads:
times = fastest['Epoch'] # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map( # re-index with a `time` and index field
pendulum.from_timestamp, [first, last] shm.push(
fastest[-3 * _secs_in_day:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
) )
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
async with open_history_client(fqsn) as hist: async with mod.open_history_client(fqsn) as hist:
# get latest query's worth of history all the way # get latest query's worth of history
# back to what is recorded in the tsdb array, next_dt = await hist(end_dt='')
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
# let caller unblock and deliver latest history frame last_dt = datetime.fromtimestamp(last_s)
task_status.started(shm) array, next_dt = await hist(end_dt=last_dt)
some_data_ready.set()
# pull new history frames until we hit latest some_data_ready.set()
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
# TODO: see if there's faster multi-field reads: elif opened:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-shm._first.value:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# TODO: write new data to tsdb to be ready to for next
# read.
if do_legacy_backfill:
# do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..') log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
await bus.nursery.start( _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
start_backfill,
mod,
bfqsn,
shm,
)
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(shm) task_status.started(shm)
# indicate to caller that feed can be delivered to # indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history # remote requesting client since we've loaded history
# data that can be used. # data that can be used.
some_data_ready.set() some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever() await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
start_stream: bool = True, start_stream: bool = True,
@ -400,7 +337,6 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec - a real-time streaming task which connec
''' '''
# load backend module
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
except ImportError: except ImportError:
@ -425,10 +361,8 @@ async def allocate_persistent_feed(
loglevel=loglevel, loglevel=loglevel,
) )
) )
# the broker-specific fully qualified symbol name, # the broker-specific fully qualified symbol name
# but ensure it is lower-cased for external use. bfqsn = init_msg[symbol]['fqsn']
bfqsn = init_msg[symbol]['fqsn'].lower()
init_msg[symbol]['fqsn'] = bfqsn
# HISTORY, run 2 tasks: # HISTORY, run 2 tasks:
# - a history loader / maintainer # - a history loader / maintainer
@ -443,7 +377,7 @@ async def allocate_persistent_feed(
manage_history, manage_history,
mod, mod,
bus, bus,
'.'.join((bfqsn, brokername)), bfqsn,
some_data_ready, some_data_ready,
feed_is_live, feed_is_live,
) )
@ -456,11 +390,9 @@ async def allocate_persistent_feed(
# true fqsn # true fqsn
fqsn = '.'.join((bfqsn, brokername)) fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix # add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as # TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that # the fastest "sample period" since we'll probably always want that
@ -474,14 +406,13 @@ async def allocate_persistent_feed(
await some_data_ready.wait() await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol # append ``.<broker>`` suffix to each quote symbol
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' bsym = symbol + f'.{brokername}'
generic_first_quotes = { generic_first_quotes = {
acceptable_not_fqsn_with_broker_suffix: first_quote, bsym: first_quote,
fqsn: first_quote, fqsn: first_quote,
} }
bus.feeds[symbol] = bus.feeds[bfqsn] = ( bus.feeds[symbol] = bus.feeds[fqsn] = (
init_msg, init_msg,
generic_first_quotes, generic_first_quotes,
) )
@ -494,22 +425,9 @@ async def allocate_persistent_feed(
if not start_stream: if not start_stream:
await trio.sleep_forever() await trio.sleep_forever()
# begin real-time updates of shm and tsb once the feed goes live and # backend will indicate when real-time quotes have begun.
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling
# at the current detected step period.
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
@ -532,7 +450,7 @@ async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbol: str, # normally expected to the broker-specific fqsn symbol: str,
loglevel: str, loglevel: str,
tick_throttle: Optional[float] = None, tick_throttle: Optional[float] = None,
start_stream: bool = True, start_stream: bool = True,
@ -554,9 +472,7 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol # TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir). # (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are # ensure we are who we think we are
servicename = tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
@ -566,7 +482,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol) entry = bus.feeds.get(symbol)
if entry is None: if entry is None:
# allocate a new actor-local stream bus which # allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime. # will persist for this `brokerd`.
async with bus.task_lock: async with bus.task_lock:
await bus.nursery.start( await bus.nursery.start(
partial( partial(
@ -590,12 +506,12 @@ async def open_feed_bus(
init_msg, first_quotes = bus.feeds[symbol] init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol] msg = init_msg[symbol]
bfqsn = msg['fqsn'].lower() bfqsn = msg['fqsn']
# true fqsn # true fqsn
fqsn = '.'.join([bfqsn, brokername]) fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes assert fqsn in first_quotes
assert bus.feeds[bfqsn] assert bus.feeds[fqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}' bsym = symbol + f'.{brokername}'
@ -909,10 +825,7 @@ async def maybe_open_feed(
**kwargs, **kwargs,
) -> ( ) -> (Feed, ReceiveChannel[dict[str, Any]]):
Feed,
ReceiveChannel[dict[str, Any]],
):
''' '''
Maybe open a data to a ``brokerd`` daemon only if there is no Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
@ -933,7 +846,6 @@ async def maybe_open_feed(
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=fqsn, key=fqsn,
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:

View File

@ -29,13 +29,14 @@ from typing import (
Any, Any,
Optional, Optional,
Union, Union,
# Callable,
# TYPE_CHECKING,
) )
import time import time
from math import isnan from math import isnan
from bidict import bidict from bidict import bidict
import msgpack import msgpack
import pyqtgraph as pg
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import tractor import tractor
@ -48,8 +49,15 @@ from anyio_marketstore import (
import purerpc import purerpc
from .feed import maybe_open_feed from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__) log = get_logger(__name__)
@ -227,16 +235,6 @@ class Storage:
# series' cache from tsdb reads # series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {} self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None: async def write_ticks(self, ticks: list) -> None:
... ...
@ -258,29 +256,14 @@ class Storage:
if fqsn not in syms: if fqsn not in syms:
return {} return {}
tfstr = tf_in_1s[1]
params = Params(
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
)
if timeframe is None: if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..') log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity # loop through and try to find highest granularity
for tfstr in tf_in_1s.values(): for tfstr in tf_in_1s.values():
try: try:
log.info(f'querying for {tfstr}@{fqsn}') log.info(f'querying for {tfstr}@{fqsn}')
params.set('timeframe', tfstr) result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
result = await client.query(params)
break break
except purerpc.grpclib.exceptions.UnknownError: except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and # XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay. # thus shows up through `marketstored` logs relay.
@ -290,11 +273,9 @@ class Storage:
return {} return {}
else: else:
result = await client.query(params) tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map # Fill out a `numpy` array-results map
arrays = {} arrays = {}
for fqsn, data_set in result.by_symbols().items(): for fqsn, data_set in result.by_symbols().items():
@ -302,22 +283,7 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe] tf_in_1s.inverse[data_set.timeframe]
] = data_set.array ] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] return arrays[fqsn][timeframe] if timeframe else arrays
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
@acm @acm
@ -330,16 +296,19 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format. Load a series by key and deliver in ``numpy`` struct array format.
''' '''
async with ( async with get_client() as client:
# eventually a storage backend endpoint
get_client() as client, storage_client = Storage(client)
): arrays = await storage_client.read_ohlcv(
# slap on our wrapper api fqsn,
yield Storage(client) period,
)
yield storage_client, arrays
async def tsdb_history_update( async def backfill_history_diff(
fqsn: str, # symbol: Symbol
) -> list[str]: ) -> list[str]:
@ -369,95 +338,108 @@ async def tsdb_history_update(
# * the original data feed arch blurb: # * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# #
profiler = pg.debug.Profiler(
disabled=False, # not pg_profile_enabled(), broker = 'ib'
delayed=False, symbol = 'mnq.globex'
)
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
async with ( async with (
open_storage_client(fqsn) as storage, get_client() as client,
maybe_open_feed( maybe_open_feed(
[fqsn], broker,
[symbol],
loglevel='info',
# backpressure=False,
start_stream=False, start_stream=False,
) as (feed, stream), ) as (feed, stream),
): ):
profiler(f'opened feed for {fqsn}') syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions # diff db history with shm and only write the missing portions
ohlcv = feed.shm.array ohlcv = feed.shm.array
# TODO: use pg profiler key = (fqsn, '1Sec', 'OHLCV')
tsdb_arrays = await storage.read_ohlcv(fqsn) tbk = mk_tbk(key)
to_append = feed.shm.array # diff vs. existing array and append new history
to_prepend = None # TODO:
# hist diffing # TODO: should be no error?
if tsdb_arrays: # assert not resp.responses
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
profiler('Finished db arrays diffs') start = time.time()
for array in [to_append, to_prepend]: qr = await client.query(
if array is None: # Params(fqsn, '1Sec`', 'OHLCV',)
continue Params(*key),
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
log.info( s1 = arrays[(fqsn, 1)]
f'Writing datums {array.size} -> to tsdb from shm\n' to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
)
# build mkts schema compat array for writing end_diff = time.time()
mkts_dt = np.dtype(_ohlcv_dt) diff_ms = round((end_diff - start) * 1e3, ndigits=2)
mkts_array = np.zeros(
len(array),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = array[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
# write to db log.info(
resp = await storage.client.write( f'Appending {to_append.size} datums to tsdb from shm\n'
mkts_array, f'Total diff time: {diff_ms} ms'
tbk=f'{fqsn}/1Sec/OHLCV', )
# NOTE: will will append duplicates # build mkts schema compat array for writing
# for the same timestamp-index. mkts_dt = np.dtype(_ohlcv_dt)
# TODO: pre deduplicate? mkts_array = np.zeros(
isvariablelength=True, len(to_append),
) dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = to_append[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
log.info( # write to db
f'Wrote {to_append.size} datums to tsdb\n' resp = await client.write(
) mkts_array,
profiler('Finished db writes') tbk=tbk,
# NOTE: will will append duplicates
# for the same timestamp-index.
isvariablelength=True,
)
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
for resp in resp.responses: # TODO: backfiller loop
err = resp.error from piker.ui._compression import downsample
if err: x, y = downsample(
raise MarketStoreError(err) s1['Epoch'],
s1['Close'],
from tractor.trionics import ipython_embed bins=10,
await ipython_embed() )
await tractor.breakpoint()
async def ingest_quote_stream( async def ingest_quote_stream(

View File

@ -54,7 +54,7 @@ for name in win_names:
# disconnect? # disconnect?
for key_combo, timeout in [ for key_combo, timeout in [
# only required if we need a connection reset. # only required if we need a connection reset.
# ('ctrl+alt+r', 12), ('ctrl+alt+r', 12),
# data feed reset. # data feed reset.
('ctrl+alt+f', 6) ('ctrl+alt+f', 6)
]: ]: