Compare commits

..

No commits in common. "381fa1145822c05f242bbfe97e1b0daf8be5dc80" and "94cba54beb85f86f030a41e021d0d2518a07e1b6" have entirely different histories.

6 changed files with 318 additions and 461 deletions

View File

@ -24,13 +24,14 @@ from typing import (
# Any, # Any,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
# import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
import docker import docker
import json import json
from docker.models.containers import Container as DockerContainer from docker.models.containers import Container
from docker.errors import DockerException, APIError from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout from requests.exceptions import ConnectionError, ReadTimeout
@ -96,7 +97,6 @@ async def open_docker(
base_url=url, base_url=url,
**kwargs **kwargs
) if url else docker.from_env(**kwargs) ) if url else docker.from_env(**kwargs)
yield client yield client
except ( except (
@ -127,140 +127,43 @@ async def open_docker(
finally: finally:
if client: if client:
# for c in client.containers.list():
# c.kill()
client.close() client.close()
# client.api._custom_adapter.close() # client.api._custom_adapter.close()
for c in client.containers.list():
c.kill()
class Container: # async def waitfor(
''' # cntr: Container,
Wrapper around a ``docker.models.containers.Container`` to include # attr_path: tuple[str],
log capture and relay through our native logging system and helper # expect=None,
method(s) for cancellation/teardown. # timeout: float = 0.5,
''' # ) -> Any:
def __init__( # '''
self, # Wait for a container's attr value to be set. If ``expect`` is
cntr: DockerContainer, # provided wait for the value to be set to that value.
) -> None:
self.cntr = cntr # This is an async version of the helper from our ``pytest-dockerctl``
# log msg de-duplication # plugin.
self.seen_so_far = set()
async def process_logs_until( # '''
self, # def get(val, path):
patt: str, # for key in path:
bp_on_msg: bool = False, # val = val[key]
) -> bool: # return val
'''
Attempt to capture container log messages and relay through our
native logging system.
''' # start = time.time()
seen_so_far = self.seen_so_far # while time.time() - start < timeout:
# cntr.reload()
while True: # val = get(cntr.attrs, attr_path)
logs = self.cntr.logs() # if expect is None and val:
entries = logs.decode().split('\n') # return val
for entry in entries: # elif val == expect:
# return val
# ignore null lines # else:
if not entry: # raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
continue # attr_path, expect if expect else 'not None', val))
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@tractor.context @tractor.context
@ -305,7 +208,7 @@ async def open_marketstored(
type='bind', type='bind',
) )
dcntr: DockerContainer = client.containers.run( cntr: Container = client.containers.run(
'alpacamarkets/marketstore:latest', 'alpacamarkets/marketstore:latest',
# do we need this for cmds? # do we need this for cmds?
# '-i', # '-i',
@ -317,14 +220,46 @@ async def open_marketstored(
}, },
mounts=[config_dir_mnt, data_dir_mnt], mounts=[config_dir_mnt, data_dir_mnt],
detach=True, detach=True,
# stop_signal='SIGINT', stop_signal='SIGINT',
init=True, init=True,
# remove=True, # remove=True,
) )
cntr = Container(dcntr) try:
seen_so_far = set()
with trio.move_on_after(1): async def process_logs_until(
found = await cntr.process_logs_until( match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...", "launching tcp listener for all services...",
) )
@ -333,48 +268,32 @@ async def open_marketstored(
'Failed to start `marketstore` check logs deats' 'Failed to start `marketstore` check logs deats'
) )
await ctx.started((cntr.cntr.id, os.getpid())) await ctx.started(cntr.id)
await process_logs_until('exiting...',)
# async with ctx.open_stream() as stream:
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')
# # print("CANCELLING CONTAINER")
# await cntr.cancel()
# # print("SENDING ACK")
# await stream.send('ack')
except ( except (
BaseException, BaseException,
# trio.Cancelled, # trio.Cancelled,
# KeyboardInterrupt, # KeyboardInterrupt,
): ):
cntr.kill('SIGINT')
with trio.CancelScope(shield=True): with trio.move_on_after(0.5) as cs:
await cntr.cancel() cs.shield = True
# await stream.send('ack') await process_logs_until('exiting...',)
raise raise
finally:
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
@ -423,18 +342,9 @@ async def start_ahab(
open_marketstored, open_marketstored,
) as (ctx, first): ) as (ctx, first):
cid, pid = first assert str(first)
# run till cancelled
await trio.sleep_forever() await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'
# since we demoted root perms in this parent # since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in # we'll get a perms error on proc cleanup in

View File

@ -126,7 +126,7 @@ def ms_stream(
) )
@click.argument('symbols', nargs=-1) @click.argument('symbols', nargs=-1)
@click.pass_obj @click.pass_obj
def storesh( def ms_shell(
config, config,
tl, tl,
host, host,
@ -137,18 +137,43 @@ def storesh(
Start an IPython shell ready to query the local marketstore db. Start an IPython shell ready to query the local marketstore db.
''' '''
from piker.data.marketstore import tsdb_history_update from piker.data.marketstore import backfill_history_diff
from piker._daemon import open_piker_runtime from piker._daemon import open_piker_runtime
async def main(): async def main():
nonlocal symbols nonlocal symbols
async with open_piker_runtime( async with open_piker_runtime(
'storesh', 'ms_shell',
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
symbol = symbols[0] try:
await tsdb_history_update(symbol) await backfill_history_diff()
except OSError:
# TODO: write magics to query marketstore
sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
trio.run(main) trio.run(main)
@ -159,11 +184,10 @@ def storesh(
@click.argument('name', nargs=1, required=True) @click.argument('name', nargs=1, required=True)
@click.pass_obj @click.pass_obj
def ingest(config, name, test_file, tl): def ingest(config, name, test_file, tl):
''' """Ingest real-time broker quotes and ticks to a marketstore instance.
Ingest real-time broker quotes and ticks to a marketstore instance. """
'''
# global opts # global opts
brokermods = config['brokermods']
loglevel = config['loglevel'] loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
# log = config['log'] # log = config['log']

View File

@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
@ -36,7 +37,6 @@ from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pydantic import BaseModel from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_context from .._cacheables import maybe_open_context
@ -49,6 +49,7 @@ from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array, attach_shm_array,
ShmArray, ShmArray,
_secs_in_day,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from ._source import ( from ._source import (
@ -235,102 +236,50 @@ async def manage_history(
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=False, readonly=False,
) )
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored') is_up = await check_for_service('marketstored')
# for now only do backfilling if no tsdb can be found # for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened do_backfill = not is_up and opened
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client:
if is_up and opened:
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
from . import marketstore from . import marketstore
async with marketstore.open_storage_client( async with marketstore.open_storage_client(
fqsn, fqsn,
) as storage: ) as (storage, tsdb_arrays):
tsdb_arrays = await storage.read_ohlcv(fqsn) # TODO: get this shit workin
from tractor.trionics import ipython_embed
await ipython_embed()
# await ipython_embed(ns=locals())
if not tsdb_arrays: # TODO: history validation
do_legacy_backfill = True # assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
else: if tsdb_arrays:
log.info(f'Loaded tsdb history {tsdb_arrays}') log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
fastest = list(tsdb_arrays.values())[0] last_s = fastest['Epoch'][-1]
times = fastest['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
)
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
# start history anal and load missing new data via backend.
async with open_history_client(fqsn) as hist:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
# let caller unblock and deliver latest history frame
task_status.started(shm)
some_data_ready.set()
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
shm.push( shm.push(
fastest[-shm._first.value:], fastest[-3 * _secs_in_day:],
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end. # to leave some real-time buffer space at the end.
prepend=True, prepend=True,
# start=shm._len - _secs_in_day, start=shm._len - _secs_in_day,
field_map={ field_map={
'Epoch': 'time', 'Epoch': 'time',
'Open': 'open', 'Open': 'open',
@ -341,24 +290,36 @@ async def manage_history(
}, },
) )
# TODO: write new data to tsdb to be ready to for next # start history anal and load missing new data via backend.
# read. async with mod.open_history_client(fqsn) as hist:
if do_legacy_backfill: # get latest query's worth of history
# do a legacy incremental backfill from the provider. array, next_dt = await hist(end_dt='')
last_dt = datetime.fromtimestamp(last_s)
array, next_dt = await hist(end_dt=last_dt)
else:
do_backfill = True
# await tractor.breakpoint()
some_data_ready.set()
if do_backfill:
log.info('No existing `marketstored` found..') log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
await bus.nursery.start( await bus.nursery.start(
start_backfill, start_backfill,
mod, mod,
bfqsn, fqsn,
shm, shm,
) )
# _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(shm) task_status.started(shm)
@ -367,18 +328,33 @@ async def manage_history(
# data that can be used. # data that can be used.
some_data_ready.set() some_data_ready.set()
# history retreival loop depending on user interaction and thus # detect sample step size for sampled historical data
# a small RPC-prot for remotely controllinlg what data is loaded times = shm.array['time']
# for viewing. delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
await trio.sleep_forever() await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
start_stream: bool = True, start_stream: bool = True,
@ -398,7 +374,6 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec - a real-time streaming task which connec
''' '''
# load backend module
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
except ImportError: except ImportError:
@ -441,7 +416,7 @@ async def allocate_persistent_feed(
manage_history, manage_history,
mod, mod,
bus, bus,
'.'.join((bfqsn, brokername)), bfqsn,
some_data_ready, some_data_ready,
feed_is_live, feed_is_live,
) )
@ -454,11 +429,9 @@ async def allocate_persistent_feed(
# true fqsn # true fqsn
fqsn = '.'.join((bfqsn, brokername)) fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix # add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as # TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that # the fastest "sample period" since we'll probably always want that
@ -472,14 +445,13 @@ async def allocate_persistent_feed(
await some_data_ready.wait() await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol # append ``.<broker>`` suffix to each quote symbol
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' bsym = symbol + f'.{brokername}'
generic_first_quotes = { generic_first_quotes = {
acceptable_not_fqsn_with_broker_suffix: first_quote, bsym: first_quote,
fqsn: first_quote, fqsn: first_quote,
} }
bus.feeds[symbol] = bus.feeds[bfqsn] = ( bus.feeds[symbol] = bus.feeds[fqsn] = (
init_msg, init_msg,
generic_first_quotes, generic_first_quotes,
) )
@ -492,22 +464,9 @@ async def allocate_persistent_feed(
if not start_stream: if not start_stream:
await trio.sleep_forever() await trio.sleep_forever()
# begin real-time updates of shm and tsb once the feed goes live and # backend will indicate when real-time quotes have begun.
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling
# at the current detected step period.
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
@ -530,7 +489,7 @@ async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbol: str, # normally expected to the broker-specific fqsn symbol: str,
loglevel: str, loglevel: str,
tick_throttle: Optional[float] = None, tick_throttle: Optional[float] = None,
start_stream: bool = True, start_stream: bool = True,
@ -552,9 +511,7 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol # TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir). # (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are # ensure we are who we think we are
servicename = tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
@ -564,7 +521,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol) entry = bus.feeds.get(symbol)
if entry is None: if entry is None:
# allocate a new actor-local stream bus which # allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime. # will persist for this `brokerd`.
async with bus.task_lock: async with bus.task_lock:
await bus.nursery.start( await bus.nursery.start(
partial( partial(
@ -588,12 +545,12 @@ async def open_feed_bus(
init_msg, first_quotes = bus.feeds[symbol] init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol] msg = init_msg[symbol]
bfqsn = msg['fqsn'].lower() bfqsn = msg['fqsn']
# true fqsn # true fqsn
fqsn = '.'.join([bfqsn, brokername]) fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes assert fqsn in first_quotes
assert bus.feeds[bfqsn] assert bus.feeds[fqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}' bsym = symbol + f'.{brokername}'
@ -907,10 +864,7 @@ async def maybe_open_feed(
**kwargs, **kwargs,
) -> ( ) -> (Feed, ReceiveChannel[dict[str, Any]]):
Feed,
ReceiveChannel[dict[str, Any]],
):
''' '''
Maybe open a data to a ``brokerd`` daemon only if there is no Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
@ -931,7 +885,6 @@ async def maybe_open_feed(
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=fqsn, key=fqsn,
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:

View File

@ -29,13 +29,14 @@ from typing import (
Any, Any,
Optional, Optional,
Union, Union,
# Callable,
# TYPE_CHECKING,
) )
import time import time
from math import isnan from math import isnan
from bidict import bidict from bidict import bidict
import msgpack import msgpack
import pyqtgraph as pg
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import tractor import tractor
@ -48,8 +49,15 @@ from anyio_marketstore import (
import purerpc import purerpc
from .feed import maybe_open_feed from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__) log = get_logger(__name__)
@ -227,16 +235,6 @@ class Storage:
# series' cache from tsdb reads # series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {} self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None: async def write_ticks(self, ticks: list) -> None:
... ...
@ -258,29 +256,14 @@ class Storage:
if fqsn not in syms: if fqsn not in syms:
return {} return {}
tfstr = tf_in_1s[1]
params = Params(
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
)
if timeframe is None: if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..') log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity # loop through and try to find highest granularity
for tfstr in tf_in_1s.values(): for tfstr in tf_in_1s.values():
try: try:
log.info(f'querying for {tfstr}@{fqsn}') log.info(f'querying for {tfstr}@{fqsn}')
params.set('timeframe', tfstr) result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
result = await client.query(params)
break break
except purerpc.grpclib.exceptions.UnknownError: except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and # XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay. # thus shows up through `marketstored` logs relay.
@ -290,11 +273,9 @@ class Storage:
return {} return {}
else: else:
result = await client.query(params) tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map # Fill out a `numpy` array-results map
arrays = {} arrays = {}
for fqsn, data_set in result.by_symbols().items(): for fqsn, data_set in result.by_symbols().items():
@ -302,22 +283,7 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe] tf_in_1s.inverse[data_set.timeframe]
] = data_set.array ] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] return arrays[fqsn][timeframe] if timeframe else arrays
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
@acm @acm
@ -330,16 +296,19 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format. Load a series by key and deliver in ``numpy`` struct array format.
''' '''
async with ( async with get_client() as client:
# eventually a storage backend endpoint
get_client() as client, storage_client = Storage(client)
): arrays = await storage_client.read_ohlcv(
# slap on our wrapper api fqsn,
yield Storage(client) period,
)
yield storage_client, arrays
async def tsdb_history_update( async def backfill_history_diff(
fqsn: str, # symbol: Symbol
) -> list[str]: ) -> list[str]:
@ -369,64 +338,73 @@ async def tsdb_history_update(
# * the original data feed arch blurb: # * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# #
profiler = pg.debug.Profiler(
disabled=False, # not pg_profile_enabled(), broker = 'ib'
delayed=False, symbol = 'mnq.globex'
)
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
async with ( async with (
open_storage_client(fqsn) as storage, get_client() as client,
maybe_open_feed( maybe_open_feed(
[fqsn], broker,
[symbol],
loglevel='info',
# backpressure=False,
start_stream=False, start_stream=False,
) as (feed, stream), ) as (feed, stream),
): ):
profiler(f'opened feed for {fqsn}') syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions # diff db history with shm and only write the missing portions
ohlcv = feed.shm.array ohlcv = feed.shm.array
# TODO: use pg profiler key = (fqsn, '1Sec', 'OHLCV')
tsdb_arrays = await storage.read_ohlcv(fqsn) tbk = mk_tbk(key)
to_append = feed.shm.array # diff vs. existing array and append new history
to_prepend = None # TODO:
# hist diffing # TODO: should be no error?
if tsdb_arrays: # assert not resp.responses
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
profiler('Finished db arrays diffs') start = time.time()
for array in [to_append, to_prepend]: qr = await client.query(
if array is None: # Params(fqsn, '1Sec`', 'OHLCV',)
continue Params(*key),
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
s1 = arrays[(fqsn, 1)]
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
end_diff = time.time()
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
log.info( log.info(
f'Writing datums {array.size} -> to tsdb from shm\n' f'Appending {to_append.size} datums to tsdb from shm\n'
f'Total diff time: {diff_ms} ms'
) )
# build mkts schema compat array for writing # build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt) mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros( mkts_array = np.zeros(
len(array), len(to_append),
dtype=mkts_dt, dtype=mkts_dt,
) )
# copy from shm array (yes it's this easy): # copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = array[[ mkts_array[:] = to_append[[
'time', 'time',
'open', 'open',
'high', 'high',
@ -436,28 +414,32 @@ async def tsdb_history_update(
]] ]]
# write to db # write to db
resp = await storage.client.write( resp = await client.write(
mkts_array, mkts_array,
tbk=f'{fqsn}/1Sec/OHLCV', tbk=tbk,
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True, isvariablelength=True,
) )
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info( log.info(
f'Wrote {to_append.size} datums to tsdb\n' f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
) )
profiler('Finished db writes')
for resp in resp.responses: for resp in resp.responses:
err = resp.error err = resp.error
if err: if err:
raise MarketStoreError(err) raise MarketStoreError(err)
from tractor.trionics import ipython_embed # TODO: backfiller loop
await ipython_embed() from piker.ui._compression import downsample
x, y = downsample(
s1['Epoch'],
s1['Close'],
bins=10,
)
await tractor.breakpoint()
async def ingest_quote_stream( async def ingest_quote_stream(

View File

@ -7,7 +7,3 @@
# pin this to a dev branch that we have more control over especially # pin this to a dev branch that we have more control over especially
# as more graphics stuff gets hashed out. # as more graphics stuff gets hashed out.
-e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph -e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph
# our async client for ``marketstore`` (the tsdb)
-e git+https://github.com/pikers/anyio-marketstore.git@master#egg=anyio-marketstore

View File

@ -77,14 +77,6 @@ setup(
# tsdbs # tsdbs
'pymarketstore', 'pymarketstore',
], ],
extras_require={
# serialization
'tsdb': [
'docker',
],
},
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``... python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"], keywords=["async", "trading", "finance", "quant", "charting"],