Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 381fa11458 More reliable `marketstored` + container supervision
It turns out (i guess not so shockingly?) that `marketstore` doesn't
always teardown "gracefully" under SIGINT (seems to hang if there are
open client connections which are also in the midst of teardown?) so
this instead first tries the SIGINT and then fails over to a SIGKILL
(destroy loop) which seems to be much more reliable to ensure shutdown
without any downside - in terms of a "hard kill".

Originally i was thinking the issue was root perms related (which get
relegated solely to the `marketstored` daemon actor after spawn) but
actually it was indeed the signalling / application layer causing the
hold-up/latency on teardown. There's a bunch of lingering (now
commented) code which tried to solve this non-problem as well as a bunch
logging/prints to help decipher the root of the issue - this will all
get cleaned out shortly.
2022-05-09 11:09:00 -04:00
Tyler Goodlet 7bb54f61b7 Add `docker` as `tsdb` extras dep 2022-05-09 11:09:00 -04:00
Tyler Goodlet c2c3516b34 Add `anyio-marketstore` client as dev dep 2022-05-09 11:09:00 -04:00
Tyler Goodlet 1f58f5daba Handle non-fqsn for derivs and don't put brokername in 2022-05-09 11:09:00 -04:00
Tyler Goodlet eb4b63a710 Limit ohlc queries to 800k datums to avoid `purepc` size error 2022-05-09 11:09:00 -04:00
Tyler Goodlet 6971eca46c Get sync-to-marketstore-tsdb history retrieval workinnn 2022-05-09 11:09:00 -04:00
Tyler Goodlet 033a32cff1 Handle "fatal" level log msgs in docker super 2022-05-09 11:09:00 -04:00
Tyler Goodlet df8054b7a4 Add basic tsdb history loading
If `marketstore` is detected try to only load most recent missing data
from the data provider (broker) and the rest from the tsdb and push it
all to shm for display in the UI. If the provider/broker doesn't have
the history client endpoint, just use the old one for now so we can
start to incrementally add support. Don't start the ohlc step
incrementer task until the backend signals that the feed is live.
2022-05-09 11:09:00 -04:00
Tyler Goodlet 288eda195f Drop `ms-shell`, add `piker storesh` cmd 2022-05-09 11:09:00 -04:00
Tyler Goodlet 17f469d619 Add diffing logic to `tsdb_history_update()`
Add some basic `numpy` epoch slice logic to generate append and prepend
arrays to write to the db.

Mooar cool things,
- add a `Storage.delete_ts()` method to wipe a column series from the db
  easily.
- don't attempt to read in any OHLC series by default on client load
- add some `pyqtgraph` profiling and drop manual latency measures
- if no db series for the fqsn exists write the entire shm array
2022-05-09 11:09:00 -04:00
6 changed files with 461 additions and 318 deletions

View File

@ -24,14 +24,13 @@ from typing import (
# Any,
)
from contextlib import asynccontextmanager as acm
# import time
import trio
from trio_typing import TaskStatus
import tractor
import docker
import json
from docker.models.containers import Container
from docker.models.containers import Container as DockerContainer
from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout
@ -97,6 +96,7 @@ async def open_docker(
base_url=url,
**kwargs
) if url else docker.from_env(**kwargs)
yield client
except (
@ -127,43 +127,140 @@ async def open_docker(
finally:
if client:
# for c in client.containers.list():
# c.kill()
client.close()
# client.api._custom_adapter.close()
for c in client.containers.list():
c.kill()
# async def waitfor(
# cntr: Container,
# attr_path: tuple[str],
# expect=None,
# timeout: float = 0.5,
class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
# ) -> Any:
# '''
# Wait for a container's attr value to be set. If ``expect`` is
# provided wait for the value to be set to that value.
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:
# This is an async version of the helper from our ``pytest-dockerctl``
# plugin.
self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()
# '''
# def get(val, path):
# for key in path:
# val = val[key]
# return val
async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
# start = time.time()
# while time.time() - start < timeout:
# cntr.reload()
# val = get(cntr.attrs, attr_path)
# if expect is None and val:
# return val
# elif val == expect:
# return val
# else:
# raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
# attr_path, expect if expect else 'not None', val))
'''
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:
# ignore null lines
if not entry:
continue
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@tractor.context
@ -208,7 +305,7 @@ async def open_marketstored(
type='bind',
)
cntr: Container = client.containers.run(
dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
@ -220,79 +317,63 @@ async def open_marketstored(
},
mounts=[config_dir_mnt, data_dir_mnt],
detach=True,
stop_signal='SIGINT',
# stop_signal='SIGINT',
init=True,
# remove=True,
)
try:
seen_so_far = set()
cntr = Container(dcntr)
async def process_logs_until(
match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
with trio.move_on_after(1):
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
await ctx.started((cntr.cntr.id, os.getpid()))
await ctx.started(cntr.id)
await process_logs_until('exiting...',)
# async with ctx.open_stream() as stream:
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')
# # print("CANCELLING CONTAINER")
# await cntr.cancel()
# # print("SENDING ACK")
# await stream.send('ack')
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise
finally:
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
with trio.CancelScope(shield=True):
await cntr.cancel()
# await stream.send('ack')
raise
async def start_ahab(
@ -342,9 +423,18 @@ async def start_ahab(
open_marketstored,
) as (ctx, first):
assert str(first)
# run till cancelled
cid, pid = first
await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'
# since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in

View File

@ -126,7 +126,7 @@ def ms_stream(
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def ms_shell(
def storesh(
config,
tl,
host,
@ -137,43 +137,18 @@ def ms_shell(
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import backfill_history_diff
from piker.data.marketstore import tsdb_history_update
from piker._daemon import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'ms_shell',
'storesh',
enable_modules=['piker.data._ahab'],
):
try:
await backfill_history_diff()
except OSError:
# TODO: write magics to query marketstore
sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
symbol = symbols[0]
await tsdb_history_update(symbol)
trio.run(main)
@ -184,10 +159,11 @@ def ms_shell(
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts
brokermods = config['brokermods']
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']

View File

@ -22,7 +22,6 @@ This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager
from functools import partial
from types import ModuleType
@ -37,6 +36,7 @@ from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod
from .._cacheables import maybe_open_context
@ -49,7 +49,6 @@ from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
ShmArray,
_secs_in_day,
)
from .ingest import get_ingestormod
from ._source import (
@ -236,125 +235,150 @@ async def manage_history(
# we expect the sub-actor to write
readonly=False,
)
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored')
# for now only do backfilling if no tsdb can be found
do_backfill = not is_up and opened
do_legacy_backfill = not is_up and opened
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client:
if is_up and opened:
log.info('Found existing `marketstored`')
from . import marketstore
async with marketstore.open_storage_client(
fqsn,
) as (storage, tsdb_arrays):
) as storage:
# TODO: get this shit workin
from tractor.trionics import ipython_embed
await ipython_embed()
# await ipython_embed(ns=locals())
tsdb_arrays = await storage.read_ohlcv(fqsn)
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
if not tsdb_arrays:
do_legacy_backfill = True
if tsdb_arrays:
else:
log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
last_s = fastest['Epoch'][-1]
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-3 * _secs_in_day:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
)
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
# start history anal and load missing new data via backend.
async with mod.open_history_client(fqsn) as hist:
async with open_history_client(fqsn) as hist:
# get latest query's worth of history
array, next_dt = await hist(end_dt='')
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
last_dt = datetime.fromtimestamp(last_s)
array, next_dt = await hist(end_dt=last_dt)
else:
do_backfill = True
# let caller unblock and deliver latest history frame
task_status.started(shm)
some_data_ready.set()
# await tractor.breakpoint()
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
some_data_ready.set()
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-shm._first.value:],
if do_backfill:
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# TODO: write new data to tsdb to be ready to for next
# read.
if do_legacy_backfill:
# do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
await bus.nursery.start(
start_backfill,
mod,
fqsn,
bfqsn,
shm,
)
# _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
# yield back after client connect with filled shm
task_status.started(shm)
# yield back after client connect with filled shm
task_status.started(shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed(
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
start_stream: bool = True,
@ -374,6 +398,7 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec
'''
# load backend module
try:
mod = get_brokermod(brokername)
except ImportError:
@ -416,7 +441,7 @@ async def allocate_persistent_feed(
manage_history,
mod,
bus,
bfqsn,
'.'.join((bfqsn, brokername)),
some_data_ready,
feed_is_live,
)
@ -429,9 +454,11 @@ async def allocate_persistent_feed(
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
@ -445,13 +472,14 @@ async def allocate_persistent_feed(
await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}'
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
generic_first_quotes = {
bsym: first_quote,
acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote,
}
bus.feeds[symbol] = bus.feeds[fqsn] = (
bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg,
generic_first_quotes,
)
@ -464,9 +492,22 @@ async def allocate_persistent_feed(
if not start_stream:
await trio.sleep_forever()
# backend will indicate when real-time quotes have begun.
# begin real-time updates of shm and tsb once the feed goes live and
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling
# at the current detected step period.
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
@ -489,7 +530,7 @@ async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
@ -511,7 +552,9 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername)
@ -521,7 +564,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`.
# will persist for this `brokerd`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
@ -545,12 +588,12 @@ async def open_feed_bus(
init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol]
bfqsn = msg['fqsn']
bfqsn = msg['fqsn'].lower()
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[fqsn]
assert bus.feeds[bfqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'
@ -864,7 +907,10 @@ async def maybe_open_feed(
**kwargs,
) -> (Feed, ReceiveChannel[dict[str, Any]]):
) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
@ -885,6 +931,7 @@ async def maybe_open_feed(
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,
) as (cache_hit, feed):
if cache_hit:

View File

@ -29,14 +29,13 @@ from typing import (
Any,
Optional,
Union,
# Callable,
# TYPE_CHECKING,
)
import time
from math import isnan
from bidict import bidict
import msgpack
import pyqtgraph as pg
import numpy as np
import pandas as pd
import tractor
@ -49,15 +48,8 @@ from anyio_marketstore import (
import purerpc
from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__)
@ -235,6 +227,16 @@ class Storage:
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None:
...
@ -256,14 +258,29 @@ class Storage:
if fqsn not in syms:
return {}
tfstr = tf_in_1s[1]
params = Params(
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
)
if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity
for tfstr in tf_in_1s.values():
try:
log.info(f'querying for {tfstr}@{fqsn}')
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
params.set('timeframe', tfstr)
result = await client.query(params)
break
except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay.
@ -273,9 +290,11 @@ class Storage:
return {}
else:
tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
result = await client.query(params)
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
@ -283,7 +302,22 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
@acm
@ -296,19 +330,16 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format.
'''
async with get_client() as client:
storage_client = Storage(client)
arrays = await storage_client.read_ohlcv(
fqsn,
period,
)
yield storage_client, arrays
async with (
# eventually a storage backend endpoint
get_client() as client,
):
# slap on our wrapper api
yield Storage(client)
async def backfill_history_diff(
# symbol: Symbol
async def tsdb_history_update(
fqsn: str,
) -> list[str]:
@ -338,108 +369,95 @@ async def backfill_history_diff(
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
broker = 'ib'
symbol = 'mnq.globex'
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
profiler = pg.debug.Profiler(
disabled=False, # not pg_profile_enabled(),
delayed=False,
)
async with (
get_client() as client,
open_storage_client(fqsn) as storage,
maybe_open_feed(
broker,
[symbol],
loglevel='info',
# backpressure=False,
[fqsn],
start_stream=False,
) as (feed, stream),
):
syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
profiler(f'opened feed for {fqsn}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
key = (fqsn, '1Sec', 'OHLCV')
tbk = mk_tbk(key)
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
# diff vs. existing array and append new history
# TODO:
to_append = feed.shm.array
to_prepend = None
# TODO: should be no error?
# assert not resp.responses
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
start = time.time()
profiler('Finished db arrays diffs')
qr = await client.query(
# Params(fqsn, '1Sec`', 'OHLCV',)
Params(*key),
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
for array in [to_append, to_prepend]:
if array is None:
continue
s1 = arrays[(fqsn, 1)]
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
log.info(
f'Writing datums {array.size} -> to tsdb from shm\n'
)
end_diff = time.time()
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(array),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = array[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
log.info(
f'Appending {to_append.size} datums to tsdb from shm\n'
f'Total diff time: {diff_ms} ms'
)
# write to db
resp = await storage.client.write(
mkts_array,
tbk=f'{fqsn}/1Sec/OHLCV',
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(to_append),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = to_append[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True,
)
# write to db
resp = await client.write(
mkts_array,
tbk=tbk,
# NOTE: will will append duplicates
# for the same timestamp-index.
isvariablelength=True,
)
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
)
profiler('Finished db writes')
# TODO: backfiller loop
from piker.ui._compression import downsample
x, y = downsample(
s1['Epoch'],
s1['Close'],
bins=10,
)
await tractor.breakpoint()
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
from tractor.trionics import ipython_embed
await ipython_embed()
async def ingest_quote_stream(

View File

@ -7,3 +7,7 @@
# pin this to a dev branch that we have more control over especially
# as more graphics stuff gets hashed out.
-e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph
# our async client for ``marketstore`` (the tsdb)
-e git+https://github.com/pikers/anyio-marketstore.git@master#egg=anyio-marketstore

View File

@ -77,6 +77,14 @@ setup(
# tsdbs
'pymarketstore',
],
extras_require={
# serialization
'tsdb': [
'docker',
],
},
tests_require=['pytest'],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"],