Compare commits

..

No commits in common. "381fa1145822c05f242bbfe97e1b0daf8be5dc80" and "94cba54beb85f86f030a41e021d0d2518a07e1b6" have entirely different histories.

6 changed files with 318 additions and 461 deletions

View File

@ -24,13 +24,14 @@ from typing import (
# Any,
)
from contextlib import asynccontextmanager as acm
# import time
import trio
from trio_typing import TaskStatus
import tractor
import docker
import json
from docker.models.containers import Container as DockerContainer
from docker.models.containers import Container
from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout
@ -96,7 +97,6 @@ async def open_docker(
base_url=url,
**kwargs
) if url else docker.from_env(**kwargs)
yield client
except (
@ -127,140 +127,43 @@ async def open_docker(
finally:
if client:
# for c in client.containers.list():
# c.kill()
client.close()
# client.api._custom_adapter.close()
for c in client.containers.list():
c.kill()
class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
# async def waitfor(
# cntr: Container,
# attr_path: tuple[str],
# expect=None,
# timeout: float = 0.5,
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:
# ) -> Any:
# '''
# Wait for a container's attr value to be set. If ``expect`` is
# provided wait for the value to be set to that value.
self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()
# This is an async version of the helper from our ``pytest-dockerctl``
# plugin.
async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
# '''
# def get(val, path):
# for key in path:
# val = val[key]
# return val
'''
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:
# ignore null lines
if not entry:
continue
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
# start = time.time()
# while time.time() - start < timeout:
# cntr.reload()
# val = get(cntr.attrs, attr_path)
# if expect is None and val:
# return val
# elif val == expect:
# return val
# else:
# raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
# attr_path, expect if expect else 'not None', val))
@tractor.context
@ -305,7 +208,7 @@ async def open_marketstored(
type='bind',
)
dcntr: DockerContainer = client.containers.run(
cntr: Container = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
@ -317,64 +220,80 @@ async def open_marketstored(
},
mounts=[config_dir_mnt, data_dir_mnt],
detach=True,
# stop_signal='SIGINT',
stop_signal='SIGINT',
init=True,
# remove=True,
)
cntr = Container(dcntr)
try:
seen_so_far = set()
with trio.move_on_after(1):
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
async def process_logs_until(
match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
)
await ctx.started((cntr.cntr.id, os.getpid()))
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
# async with ctx.open_stream() as stream:
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')
# # print("CANCELLING CONTAINER")
# await cntr.cancel()
# # print("SENDING ACK")
# await stream.send('ack')
await ctx.started(cntr.id)
await process_logs_until('exiting...',)
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
with trio.CancelScope(shield=True):
await cntr.cancel()
# await stream.send('ack')
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise
finally:
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
async def start_ahab(
service_name: str,
@ -423,18 +342,9 @@ async def start_ahab(
open_marketstored,
) as (ctx, first):
cid, pid = first
assert str(first)
# run till cancelled
await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'
# since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in

View File

@ -126,7 +126,7 @@ def ms_stream(
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def storesh(
def ms_shell(
config,
tl,
host,
@ -137,18 +137,43 @@ def storesh(
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import tsdb_history_update
from piker.data.marketstore import backfill_history_diff
from piker._daemon import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'storesh',
'ms_shell',
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update(symbol)
try:
await backfill_history_diff()
except OSError:
# TODO: write magics to query marketstore
sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
trio.run(main)
@ -159,11 +184,10 @@ def storesh(
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl):
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
# global opts
brokermods = config['brokermods']
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']

View File

@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager
from functools import partial
from types import ModuleType
@ -36,7 +37,6 @@ from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod
from .._cacheables import maybe_open_context
@ -49,6 +49,7 @@ from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
ShmArray,
_secs_in_day,
)
from .ingest import get_ingestormod
from ._source import (
@ -235,150 +236,125 @@ async def manage_history(
# we expect the sub-actor to write
readonly=False,
)
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored')
# for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client:
do_backfill = not is_up and opened
if is_up and opened:
log.info('Found existing `marketstored`')
from . import marketstore
async with marketstore.open_storage_client(
fqsn,
) as storage:
) as (storage, tsdb_arrays):
tsdb_arrays = await storage.read_ohlcv(fqsn)
# TODO: get this shit workin
from tractor.trionics import ipython_embed
await ipython_embed()
# await ipython_embed(ns=locals())
if not tsdb_arrays:
do_legacy_backfill = True
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
else:
if tsdb_arrays:
log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
last_s = fastest['Epoch'][-1]
fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-3 * _secs_in_day:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
# start history anal and load missing new data via backend.
async with open_history_client(fqsn) as hist:
async with mod.open_history_client(fqsn) as hist:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
# get latest query's worth of history
array, next_dt = await hist(end_dt='')
# let caller unblock and deliver latest history frame
task_status.started(shm)
some_data_ready.set()
last_dt = datetime.fromtimestamp(last_s)
array, next_dt = await hist(end_dt=last_dt)
else:
do_backfill = True
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
# await tractor.breakpoint()
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-shm._first.value:],
some_data_ready.set()
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# TODO: write new data to tsdb to be ready to for next
# read.
if do_legacy_backfill:
# do a legacy incremental backfill from the provider.
if do_backfill:
log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
await bus.nursery.start(
start_backfill,
mod,
bfqsn,
fqsn,
shm,
)
# yield back after client connect with filled shm
task_status.started(shm)
# _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# yield back after client connect with filled shm
task_status.started(shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed(
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
start_stream: bool = True,
@ -398,7 +374,6 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec
'''
# load backend module
try:
mod = get_brokermod(brokername)
except ImportError:
@ -441,7 +416,7 @@ async def allocate_persistent_feed(
manage_history,
mod,
bus,
'.'.join((bfqsn, brokername)),
bfqsn,
some_data_ready,
feed_is_live,
)
@ -454,11 +429,9 @@ async def allocate_persistent_feed(
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
@ -472,14 +445,13 @@ async def allocate_persistent_feed(
await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
bsym = symbol + f'.{brokername}'
generic_first_quotes = {
acceptable_not_fqsn_with_broker_suffix: first_quote,
bsym: first_quote,
fqsn: first_quote,
}
bus.feeds[symbol] = bus.feeds[bfqsn] = (
bus.feeds[symbol] = bus.feeds[fqsn] = (
init_msg,
generic_first_quotes,
)
@ -492,22 +464,9 @@ async def allocate_persistent_feed(
if not start_stream:
await trio.sleep_forever()
# begin real-time updates of shm and tsb once the feed goes live and
# the backend will indicate when real-time quotes have begun.
# backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling
# at the current detected step period.
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
@ -530,7 +489,7 @@ async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str, # normally expected to the broker-specific fqsn
symbol: str,
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
@ -552,9 +511,7 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername)
@ -564,7 +521,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime.
# will persist for this `brokerd`.
async with bus.task_lock:
await bus.nursery.start(
partial(
@ -588,12 +545,12 @@ async def open_feed_bus(
init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol]
bfqsn = msg['fqsn'].lower()
bfqsn = msg['fqsn']
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[bfqsn]
assert bus.feeds[fqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'
@ -907,10 +864,7 @@ async def maybe_open_feed(
**kwargs,
) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
) -> (Feed, ReceiveChannel[dict[str, Any]]):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
@ -931,7 +885,6 @@ async def maybe_open_feed(
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,
) as (cache_hit, feed):
if cache_hit:

View File

@ -29,13 +29,14 @@ from typing import (
Any,
Optional,
Union,
# Callable,
# TYPE_CHECKING,
)
import time
from math import isnan
from bidict import bidict
import msgpack
import pyqtgraph as pg
import numpy as np
import pandas as pd
import tractor
@ -48,8 +49,15 @@ from anyio_marketstore import (
import purerpc
from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__)
@ -227,16 +235,6 @@ class Storage:
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None:
...
@ -258,29 +256,14 @@ class Storage:
if fqsn not in syms:
return {}
tfstr = tf_in_1s[1]
params = Params(
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
)
if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity
for tfstr in tf_in_1s.values():
try:
log.info(f'querying for {tfstr}@{fqsn}')
params.set('timeframe', tfstr)
result = await client.query(params)
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
break
except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay.
@ -290,11 +273,9 @@ class Storage:
return {}
else:
result = await client.query(params)
tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
@ -302,22 +283,7 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
return arrays[fqsn][timeframe] if timeframe else arrays
@acm
@ -330,16 +296,19 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format.
'''
async with (
# eventually a storage backend endpoint
get_client() as client,
):
# slap on our wrapper api
yield Storage(client)
async with get_client() as client:
storage_client = Storage(client)
arrays = await storage_client.read_ohlcv(
fqsn,
period,
)
yield storage_client, arrays
async def tsdb_history_update(
fqsn: str,
async def backfill_history_diff(
# symbol: Symbol
) -> list[str]:
@ -369,95 +338,108 @@ async def tsdb_history_update(
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
profiler = pg.debug.Profiler(
disabled=False, # not pg_profile_enabled(),
delayed=False,
)
broker = 'ib'
symbol = 'mnq.globex'
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
async with (
open_storage_client(fqsn) as storage,
get_client() as client,
maybe_open_feed(
[fqsn],
broker,
[symbol],
loglevel='info',
# backpressure=False,
start_stream=False,
) as (feed, stream),
):
profiler(f'opened feed for {fqsn}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
key = (fqsn, '1Sec', 'OHLCV')
tbk = mk_tbk(key)
to_append = feed.shm.array
to_prepend = None
# diff vs. existing array and append new history
# TODO:
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
# TODO: should be no error?
# assert not resp.responses
profiler('Finished db arrays diffs')
start = time.time()
for array in [to_append, to_prepend]:
if array is None:
continue
qr = await client.query(
# Params(fqsn, '1Sec`', 'OHLCV',)
Params(*key),
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
log.info(
f'Writing datums {array.size} -> to tsdb from shm\n'
)
s1 = arrays[(fqsn, 1)]
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(array),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = array[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
end_diff = time.time()
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
# write to db
resp = await storage.client.write(
mkts_array,
tbk=f'{fqsn}/1Sec/OHLCV',
log.info(
f'Appending {to_append.size} datums to tsdb from shm\n'
f'Total diff time: {diff_ms} ms'
)
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True,
)
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(to_append),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = to_append[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
)
profiler('Finished db writes')
# write to db
resp = await client.write(
mkts_array,
tbk=tbk,
# NOTE: will will append duplicates
# for the same timestamp-index.
isvariablelength=True,
)
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
from tractor.trionics import ipython_embed
await ipython_embed()
# TODO: backfiller loop
from piker.ui._compression import downsample
x, y = downsample(
s1['Epoch'],
s1['Close'],
bins=10,
)
await tractor.breakpoint()
async def ingest_quote_stream(

View File

@ -7,7 +7,3 @@
# pin this to a dev branch that we have more control over especially
# as more graphics stuff gets hashed out.
-e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph
# our async client for ``marketstore`` (the tsdb)
-e git+https://github.com/pikers/anyio-marketstore.git@master#egg=anyio-marketstore

View File

@ -77,14 +77,6 @@ setup(
# tsdbs
'pymarketstore',
],
extras_require={
# serialization
'tsdb': [
'docker',
],
},
tests_require=['pytest'],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"],