Compare commits
10 Commits
87fab6c5b1
...
c3553de1f2
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | c3553de1f2 | |
Tyler Goodlet | 0790ae8235 | |
Tyler Goodlet | a6b8c03e0e | |
Tyler Goodlet | ac22190b60 | |
Tyler Goodlet | 75d0bf3152 | |
Tyler Goodlet | 85e2602d2e | |
Tyler Goodlet | edcac1e768 | |
Tyler Goodlet | 73b8719984 | |
Tyler Goodlet | 6b17370711 | |
Tyler Goodlet | cd3dbc9275 |
|
@ -24,7 +24,6 @@ from typing import (
|
||||||
# Any,
|
# Any,
|
||||||
)
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
# import time
|
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -97,6 +96,7 @@ async def open_docker(
|
||||||
base_url=url,
|
base_url=url,
|
||||||
**kwargs
|
**kwargs
|
||||||
) if url else docker.from_env(**kwargs)
|
) if url else docker.from_env(**kwargs)
|
||||||
|
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
@ -127,43 +127,10 @@ async def open_docker(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if client:
|
if client:
|
||||||
# for c in client.containers.list():
|
|
||||||
# c.kill()
|
|
||||||
client.close()
|
client.close()
|
||||||
# client.api._custom_adapter.close()
|
# client.api._custom_adapter.close()
|
||||||
|
for c in client.containers.list():
|
||||||
|
c.kill()
|
||||||
# async def waitfor(
|
|
||||||
# cntr: Container,
|
|
||||||
# attr_path: tuple[str],
|
|
||||||
# expect=None,
|
|
||||||
# timeout: float = 0.5,
|
|
||||||
|
|
||||||
# ) -> Any:
|
|
||||||
# '''
|
|
||||||
# Wait for a container's attr value to be set. If ``expect`` is
|
|
||||||
# provided wait for the value to be set to that value.
|
|
||||||
|
|
||||||
# This is an async version of the helper from our ``pytest-dockerctl``
|
|
||||||
# plugin.
|
|
||||||
|
|
||||||
# '''
|
|
||||||
# def get(val, path):
|
|
||||||
# for key in path:
|
|
||||||
# val = val[key]
|
|
||||||
# return val
|
|
||||||
|
|
||||||
# start = time.time()
|
|
||||||
# while time.time() - start < timeout:
|
|
||||||
# cntr.reload()
|
|
||||||
# val = get(cntr.attrs, attr_path)
|
|
||||||
# if expect is None and val:
|
|
||||||
# return val
|
|
||||||
# elif val == expect:
|
|
||||||
# return val
|
|
||||||
# else:
|
|
||||||
# raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
|
|
||||||
# attr_path, expect if expect else 'not None', val))
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -220,7 +187,7 @@ async def open_marketstored(
|
||||||
},
|
},
|
||||||
mounts=[config_dir_mnt, data_dir_mnt],
|
mounts=[config_dir_mnt, data_dir_mnt],
|
||||||
detach=True,
|
detach=True,
|
||||||
stop_signal='SIGINT',
|
# stop_signal='SIGINT',
|
||||||
init=True,
|
init=True,
|
||||||
# remove=True,
|
# remove=True,
|
||||||
)
|
)
|
||||||
|
@ -247,7 +214,7 @@ async def open_marketstored(
|
||||||
seen_so_far.add(entry)
|
seen_so_far.add(entry)
|
||||||
if bp_on_msg:
|
if bp_on_msg:
|
||||||
await tractor.breakpoint()
|
await tractor.breakpoint()
|
||||||
getattr(log, level)(f'{msg}')
|
getattr(log, level, log.error)(f'{msg}')
|
||||||
|
|
||||||
# if "launching tcp listener for all services..." in msg:
|
# if "launching tcp listener for all services..." in msg:
|
||||||
if match in msg:
|
if match in msg:
|
||||||
|
@ -269,6 +236,8 @@ async def open_marketstored(
|
||||||
)
|
)
|
||||||
|
|
||||||
await ctx.started(cntr.id)
|
await ctx.started(cntr.id)
|
||||||
|
|
||||||
|
# block for the expected "teardown log msg"..
|
||||||
await process_logs_until('exiting...',)
|
await process_logs_until('exiting...',)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
|
|
@ -22,8 +22,7 @@ from typing import Any
|
||||||
import decimal
|
import decimal
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
from pydantic import BaseModel
|
||||||
from pydantic import BaseModel, validate_arguments
|
|
||||||
# from numba import from_dtype
|
# from numba import from_dtype
|
||||||
|
|
||||||
|
|
||||||
|
@ -254,61 +253,6 @@ class Symbol(BaseModel):
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
|
|
||||||
def from_df(
|
|
||||||
|
|
||||||
df: pd.DataFrame,
|
|
||||||
source=None,
|
|
||||||
default_tf=None
|
|
||||||
|
|
||||||
) -> np.recarray:
|
|
||||||
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
|
|
||||||
|
|
||||||
"""
|
|
||||||
df.reset_index(inplace=True)
|
|
||||||
|
|
||||||
# hackery to convert field names
|
|
||||||
date = 'Date'
|
|
||||||
if 'date' in df.columns:
|
|
||||||
date = 'date'
|
|
||||||
|
|
||||||
# convert to POSIX time
|
|
||||||
df[date] = [d.timestamp() for d in df[date]]
|
|
||||||
|
|
||||||
# try to rename from some camel case
|
|
||||||
columns = {
|
|
||||||
'Date': 'time',
|
|
||||||
'date': 'time',
|
|
||||||
'Open': 'open',
|
|
||||||
'High': 'high',
|
|
||||||
'Low': 'low',
|
|
||||||
'Close': 'close',
|
|
||||||
'Volume': 'volume',
|
|
||||||
|
|
||||||
# most feeds are providing this over sesssion anchored
|
|
||||||
'vwap': 'bar_wap',
|
|
||||||
|
|
||||||
# XXX: ib_insync calls this the "wap of the bar"
|
|
||||||
# but no clue what is actually is...
|
|
||||||
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
|
|
||||||
'average': 'bar_wap',
|
|
||||||
}
|
|
||||||
|
|
||||||
df = df.rename(columns=columns)
|
|
||||||
|
|
||||||
for name in df.columns:
|
|
||||||
# if name not in base_ohlc_dtype.names[1:]:
|
|
||||||
if name not in base_ohlc_dtype.names:
|
|
||||||
del df[name]
|
|
||||||
|
|
||||||
# TODO: it turns out column access on recarrays is actually slower:
|
|
||||||
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
|
||||||
# it might make sense to make these structured arrays?
|
|
||||||
array = df.to_records(index=False)
|
|
||||||
_nan_to_closest_num(array)
|
|
||||||
|
|
||||||
return array
|
|
||||||
|
|
||||||
|
|
||||||
def _nan_to_closest_num(array: np.ndarray):
|
def _nan_to_closest_num(array: np.ndarray):
|
||||||
"""Return interpolated values instead of NaN.
|
"""Return interpolated values instead of NaN.
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ def ms_stream(
|
||||||
)
|
)
|
||||||
@click.argument('symbols', nargs=-1)
|
@click.argument('symbols', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ms_shell(
|
def storesh(
|
||||||
config,
|
config,
|
||||||
tl,
|
tl,
|
||||||
host,
|
host,
|
||||||
|
@ -137,43 +137,18 @@ def ms_shell(
|
||||||
Start an IPython shell ready to query the local marketstore db.
|
Start an IPython shell ready to query the local marketstore db.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from piker.data.marketstore import backfill_history_diff
|
from piker.data.marketstore import tsdb_history_update
|
||||||
from piker._daemon import open_piker_runtime
|
from piker._daemon import open_piker_runtime
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
nonlocal symbols
|
nonlocal symbols
|
||||||
|
|
||||||
async with open_piker_runtime(
|
async with open_piker_runtime(
|
||||||
'ms_shell',
|
'storesh',
|
||||||
enable_modules=['piker.data._ahab'],
|
enable_modules=['piker.data._ahab'],
|
||||||
):
|
):
|
||||||
try:
|
symbol = symbols[0]
|
||||||
await backfill_history_diff()
|
await tsdb_history_update(symbol)
|
||||||
except OSError:
|
|
||||||
# TODO: write magics to query marketstore
|
|
||||||
|
|
||||||
sym = symbols[0]
|
|
||||||
symbol, _, broker = sym.rpartition('.')
|
|
||||||
# (maybe) allocate shm array for this broker/symbol which will
|
|
||||||
# be used for fast near-term history capture and processing.
|
|
||||||
shm, opened = maybe_open_shm_array(
|
|
||||||
key=sym,
|
|
||||||
dtype=base_iohlc_dtype,
|
|
||||||
)
|
|
||||||
# load anything found in shm
|
|
||||||
from numpy.lib.recfunctions import structured_to_unstructured
|
|
||||||
mxmn = structured_to_unstructured(
|
|
||||||
shm.array[['low', 'high']],
|
|
||||||
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
|
|
||||||
).flatten()
|
|
||||||
|
|
||||||
from piker.ui._compression import downsample
|
|
||||||
xd, yd = downsample(
|
|
||||||
y=mxmn,
|
|
||||||
x=np.arange(len(mxmn)),
|
|
||||||
bins=4,
|
|
||||||
)
|
|
||||||
await tractor.breakpoint()
|
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -184,10 +159,11 @@ def ms_shell(
|
||||||
@click.argument('name', nargs=1, required=True)
|
@click.argument('name', nargs=1, required=True)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ingest(config, name, test_file, tl):
|
def ingest(config, name, test_file, tl):
|
||||||
"""Ingest real-time broker quotes and ticks to a marketstore instance.
|
'''
|
||||||
"""
|
Ingest real-time broker quotes and ticks to a marketstore instance.
|
||||||
|
|
||||||
|
'''
|
||||||
# global opts
|
# global opts
|
||||||
brokermods = config['brokermods']
|
|
||||||
loglevel = config['loglevel']
|
loglevel = config['loglevel']
|
||||||
tractorloglevel = config['tractorloglevel']
|
tractorloglevel = config['tractorloglevel']
|
||||||
# log = config['log']
|
# log = config['log']
|
||||||
|
|
|
@ -22,7 +22,6 @@ This module is enabled for ``brokerd`` daemons.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from datetime import datetime
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
@ -32,11 +31,14 @@ from typing import (
|
||||||
Awaitable,
|
Awaitable,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
import pendulum
|
||||||
import trio
|
import trio
|
||||||
from trio.abc import ReceiveChannel
|
from trio.abc import ReceiveChannel
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
from ..brokers import get_brokermod
|
from ..brokers import get_brokermod
|
||||||
from .._cacheables import maybe_open_context
|
from .._cacheables import maybe_open_context
|
||||||
|
@ -49,7 +51,6 @@ from ._sharedmem import (
|
||||||
maybe_open_shm_array,
|
maybe_open_shm_array,
|
||||||
attach_shm_array,
|
attach_shm_array,
|
||||||
ShmArray,
|
ShmArray,
|
||||||
_secs_in_day,
|
|
||||||
)
|
)
|
||||||
from .ingest import get_ingestormod
|
from .ingest import get_ingestormod
|
||||||
from ._source import (
|
from ._source import (
|
||||||
|
@ -192,6 +193,22 @@ async def _setup_persistent_brokerd(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def start_backfill(
|
||||||
|
mod: ModuleType,
|
||||||
|
fqsn: str,
|
||||||
|
shm: ShmArray,
|
||||||
|
|
||||||
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> int:
|
||||||
|
|
||||||
|
return await mod.backfill_bars(
|
||||||
|
fqsn,
|
||||||
|
shm,
|
||||||
|
task_status=task_status,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
bus: _FeedsBus,
|
bus: _FeedsBus,
|
||||||
|
@ -220,104 +237,150 @@ async def manage_history(
|
||||||
# we expect the sub-actor to write
|
# we expect the sub-actor to write
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
|
# TODO: history validation
|
||||||
|
if not opened:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Persistent shm for sym was already open?!"
|
||||||
|
)
|
||||||
|
|
||||||
log.info('Scanning for existing `marketstored`')
|
log.info('Scanning for existing `marketstored`')
|
||||||
|
|
||||||
is_up = await check_for_service('marketstored')
|
is_up = await check_for_service('marketstored')
|
||||||
if is_up and opened:
|
|
||||||
|
# for now only do backfilling if no tsdb can be found
|
||||||
|
do_legacy_backfill = not is_up and opened
|
||||||
|
|
||||||
|
open_history_client = getattr(mod, 'open_history_client', None)
|
||||||
|
|
||||||
|
if is_up and opened and open_history_client:
|
||||||
|
|
||||||
log.info('Found existing `marketstored`')
|
log.info('Found existing `marketstored`')
|
||||||
from . import marketstore
|
from . import marketstore
|
||||||
|
|
||||||
async with marketstore.open_storage_client(
|
async with marketstore.open_storage_client(
|
||||||
fqsn,
|
fqsn,
|
||||||
) as (storage, tsdb_arrays):
|
) as storage:
|
||||||
|
|
||||||
# TODO: history validation
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
# assert opened, f'Persistent shm for {symbol} was already open?!'
|
|
||||||
# if not opened:
|
|
||||||
# raise RuntimeError(
|
|
||||||
# "Persistent shm for sym was already open?!"
|
|
||||||
# )
|
|
||||||
|
|
||||||
if tsdb_arrays:
|
if not tsdb_arrays:
|
||||||
|
do_legacy_backfill = True
|
||||||
|
|
||||||
|
else:
|
||||||
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
||||||
fastest = list(tsdb_arrays[fqsn].values())[0]
|
|
||||||
last_s = fastest['Epoch'][-1]
|
|
||||||
|
|
||||||
# TODO: see if there's faster multi-field reads:
|
fastest = list(tsdb_arrays.values())[0]
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
times = fastest['Epoch']
|
||||||
|
first, last = times[0], times[-1]
|
||||||
# re-index with a `time` and index field
|
first_tsdb_dt, last_tsdb_dt = map(
|
||||||
shm.push(
|
pendulum.from_timestamp, [first, last]
|
||||||
fastest[-3 * _secs_in_day:],
|
|
||||||
|
|
||||||
# insert the history pre a "days worth" of samples
|
|
||||||
# to leave some real-time buffer space at the end.
|
|
||||||
prepend=True,
|
|
||||||
start=shm._len - _secs_in_day,
|
|
||||||
field_map={
|
|
||||||
'Epoch': 'time',
|
|
||||||
'Open': 'open',
|
|
||||||
'High': 'high',
|
|
||||||
'Low': 'low',
|
|
||||||
'Close': 'close',
|
|
||||||
'Volume': 'volume',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: this should be used verbatim for the pure
|
||||||
|
# shm backfiller approach below.
|
||||||
|
|
||||||
|
def diff_history(
|
||||||
|
array,
|
||||||
|
start_dt,
|
||||||
|
end_dt,
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
s_diff = (last_tsdb_dt - start_dt).seconds
|
||||||
|
|
||||||
|
# if we detect a partial frame's worth of data
|
||||||
|
# that is new, slice out only that history and
|
||||||
|
# write to shm.
|
||||||
|
if s_diff > 0:
|
||||||
|
assert last_tsdb_dt > start_dt
|
||||||
|
selected = array['time'] > last_tsdb_dt.timestamp()
|
||||||
|
to_push = array[selected]
|
||||||
|
log.info(
|
||||||
|
f'Pushing partial frame {to_push.size} to shm'
|
||||||
|
)
|
||||||
|
return to_push
|
||||||
|
|
||||||
|
else:
|
||||||
|
return array
|
||||||
|
|
||||||
# start history anal and load missing new data via backend.
|
# start history anal and load missing new data via backend.
|
||||||
async with mod.open_history_client(fqsn) as hist:
|
async with open_history_client(fqsn) as hist:
|
||||||
|
|
||||||
# get latest query's worth of history
|
# get latest query's worth of history all the way
|
||||||
array, next_dt = await hist(end_dt='')
|
# back to what is recorded in the tsdb
|
||||||
|
array, start_dt, end_dt = await hist(end_dt='')
|
||||||
|
to_push = diff_history(array, start_dt, end_dt)
|
||||||
|
shm.push(to_push)
|
||||||
|
|
||||||
last_dt = datetime.fromtimestamp(last_s)
|
# let caller unblock and deliver latest history frame
|
||||||
array, next_dt = await hist(end_dt=last_dt)
|
task_status.started(shm)
|
||||||
|
some_data_ready.set()
|
||||||
|
|
||||||
some_data_ready.set()
|
# pull new history frames until we hit latest
|
||||||
|
# already in the tsdb
|
||||||
|
while start_dt > last_tsdb_dt:
|
||||||
|
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||||
|
to_push = diff_history(array, start_dt, end_dt)
|
||||||
|
shm.push(to_push, prepend=True)
|
||||||
|
|
||||||
elif opened:
|
# TODO: see if there's faster multi-field reads:
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
|
# re-index with a `time` and index field
|
||||||
|
shm.push(
|
||||||
|
fastest[-shm._first.value:],
|
||||||
|
|
||||||
|
# insert the history pre a "days worth" of samples
|
||||||
|
# to leave some real-time buffer space at the end.
|
||||||
|
prepend=True,
|
||||||
|
# start=shm._len - _secs_in_day,
|
||||||
|
field_map={
|
||||||
|
'Epoch': 'time',
|
||||||
|
'Open': 'open',
|
||||||
|
'High': 'high',
|
||||||
|
'Low': 'low',
|
||||||
|
'Close': 'close',
|
||||||
|
'Volume': 'volume',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: write new data to tsdb to be ready to for next
|
||||||
|
# read.
|
||||||
|
|
||||||
|
if do_legacy_backfill:
|
||||||
|
# do a legacy incremental backfill from the provider.
|
||||||
log.info('No existing `marketstored` found..')
|
log.info('No existing `marketstored` found..')
|
||||||
|
|
||||||
|
bfqsn = fqsn.replace('.' + mod.name, '')
|
||||||
# start history backfill task ``backfill_bars()`` is
|
# start history backfill task ``backfill_bars()`` is
|
||||||
# a required backend func this must block until shm is
|
# a required backend func this must block until shm is
|
||||||
# filled with first set of ohlc bars
|
# filled with first set of ohlc bars
|
||||||
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
|
await bus.nursery.start(
|
||||||
|
start_backfill,
|
||||||
|
mod,
|
||||||
|
bfqsn,
|
||||||
|
shm,
|
||||||
|
)
|
||||||
|
|
||||||
# yield back after client connect with filled shm
|
# yield back after client connect with filled shm
|
||||||
task_status.started(shm)
|
task_status.started(shm)
|
||||||
|
|
||||||
# indicate to caller that feed can be delivered to
|
# indicate to caller that feed can be delivered to
|
||||||
# remote requesting client since we've loaded history
|
# remote requesting client since we've loaded history
|
||||||
# data that can be used.
|
# data that can be used.
|
||||||
some_data_ready.set()
|
some_data_ready.set()
|
||||||
|
|
||||||
# detect sample step size for sampled historical data
|
|
||||||
times = shm.array['time']
|
|
||||||
delay_s = times[-1] - times[times != times[-1]][-1]
|
|
||||||
|
|
||||||
# begin real-time updates of shm and tsb once the feed
|
|
||||||
# goes live.
|
|
||||||
await feed_is_live.wait()
|
|
||||||
|
|
||||||
if opened:
|
|
||||||
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
|
|
||||||
|
|
||||||
# start shm incrementing for OHLC sampling at the current
|
|
||||||
# detected sampling period if one dne.
|
|
||||||
if sampler.incrementers.get(delay_s) is None:
|
|
||||||
await bus.start_task(
|
|
||||||
increment_ohlc_buffer,
|
|
||||||
delay_s,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# history retreival loop depending on user interaction and thus
|
||||||
|
# a small RPC-prot for remotely controllinlg what data is loaded
|
||||||
|
# for viewing.
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
# cs.cancel()
|
|
||||||
|
|
||||||
|
|
||||||
async def allocate_persistent_feed(
|
async def allocate_persistent_feed(
|
||||||
bus: _FeedsBus,
|
bus: _FeedsBus,
|
||||||
|
|
||||||
brokername: str,
|
brokername: str,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
|
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
start_stream: bool = True,
|
start_stream: bool = True,
|
||||||
|
|
||||||
|
@ -337,6 +400,7 @@ async def allocate_persistent_feed(
|
||||||
- a real-time streaming task which connec
|
- a real-time streaming task which connec
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# load backend module
|
||||||
try:
|
try:
|
||||||
mod = get_brokermod(brokername)
|
mod = get_brokermod(brokername)
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -361,8 +425,10 @@ async def allocate_persistent_feed(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# the broker-specific fully qualified symbol name
|
# the broker-specific fully qualified symbol name,
|
||||||
bfqsn = init_msg[symbol]['fqsn']
|
# but ensure it is lower-cased for external use.
|
||||||
|
bfqsn = init_msg[symbol]['fqsn'].lower()
|
||||||
|
init_msg[symbol]['fqsn'] = bfqsn
|
||||||
|
|
||||||
# HISTORY, run 2 tasks:
|
# HISTORY, run 2 tasks:
|
||||||
# - a history loader / maintainer
|
# - a history loader / maintainer
|
||||||
|
@ -377,7 +443,7 @@ async def allocate_persistent_feed(
|
||||||
manage_history,
|
manage_history,
|
||||||
mod,
|
mod,
|
||||||
bus,
|
bus,
|
||||||
bfqsn,
|
'.'.join((bfqsn, brokername)),
|
||||||
some_data_ready,
|
some_data_ready,
|
||||||
feed_is_live,
|
feed_is_live,
|
||||||
)
|
)
|
||||||
|
@ -390,9 +456,11 @@ async def allocate_persistent_feed(
|
||||||
|
|
||||||
# true fqsn
|
# true fqsn
|
||||||
fqsn = '.'.join((bfqsn, brokername))
|
fqsn = '.'.join((bfqsn, brokername))
|
||||||
|
|
||||||
# add a fqsn entry that includes the ``.<broker>`` suffix
|
# add a fqsn entry that includes the ``.<broker>`` suffix
|
||||||
|
# and an entry that includes the broker-specific fqsn (including
|
||||||
|
# any new suffixes or elements as injected by the backend).
|
||||||
init_msg[fqsn] = msg
|
init_msg[fqsn] = msg
|
||||||
|
init_msg[bfqsn] = msg
|
||||||
|
|
||||||
# TODO: pretty sure we don't need this? why not just leave 1s as
|
# TODO: pretty sure we don't need this? why not just leave 1s as
|
||||||
# the fastest "sample period" since we'll probably always want that
|
# the fastest "sample period" since we'll probably always want that
|
||||||
|
@ -406,13 +474,14 @@ async def allocate_persistent_feed(
|
||||||
await some_data_ready.wait()
|
await some_data_ready.wait()
|
||||||
|
|
||||||
# append ``.<broker>`` suffix to each quote symbol
|
# append ``.<broker>`` suffix to each quote symbol
|
||||||
bsym = symbol + f'.{brokername}'
|
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
|
||||||
|
|
||||||
generic_first_quotes = {
|
generic_first_quotes = {
|
||||||
bsym: first_quote,
|
acceptable_not_fqsn_with_broker_suffix: first_quote,
|
||||||
fqsn: first_quote,
|
fqsn: first_quote,
|
||||||
}
|
}
|
||||||
|
|
||||||
bus.feeds[symbol] = bus.feeds[fqsn] = (
|
bus.feeds[symbol] = bus.feeds[bfqsn] = (
|
||||||
init_msg,
|
init_msg,
|
||||||
generic_first_quotes,
|
generic_first_quotes,
|
||||||
)
|
)
|
||||||
|
@ -425,9 +494,22 @@ async def allocate_persistent_feed(
|
||||||
if not start_stream:
|
if not start_stream:
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
# backend will indicate when real-time quotes have begun.
|
# begin real-time updates of shm and tsb once the feed goes live and
|
||||||
|
# the backend will indicate when real-time quotes have begun.
|
||||||
await feed_is_live.wait()
|
await feed_is_live.wait()
|
||||||
|
|
||||||
|
# start shm incrementer task for OHLC style sampling
|
||||||
|
# at the current detected step period.
|
||||||
|
times = shm.array['time']
|
||||||
|
delay_s = times[-1] - times[times != times[-1]][-1]
|
||||||
|
|
||||||
|
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
|
||||||
|
if sampler.incrementers.get(delay_s) is None:
|
||||||
|
await bus.start_task(
|
||||||
|
increment_ohlc_buffer,
|
||||||
|
delay_s,
|
||||||
|
)
|
||||||
|
|
||||||
sum_tick_vlm: bool = init_msg.get(
|
sum_tick_vlm: bool = init_msg.get(
|
||||||
'shm_write_opts', {}
|
'shm_write_opts', {}
|
||||||
).get('sum_tick_vlm', True)
|
).get('sum_tick_vlm', True)
|
||||||
|
@ -450,7 +532,7 @@ async def open_feed_bus(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
brokername: str,
|
brokername: str,
|
||||||
symbol: str,
|
symbol: str, # normally expected to the broker-specific fqsn
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
tick_throttle: Optional[float] = None,
|
tick_throttle: Optional[float] = None,
|
||||||
start_stream: bool = True,
|
start_stream: bool = True,
|
||||||
|
@ -472,7 +554,9 @@ async def open_feed_bus(
|
||||||
# TODO: check for any stale shm entries for this symbol
|
# TODO: check for any stale shm entries for this symbol
|
||||||
# (after we also group them in a nice `/dev/shm/piker/` subdir).
|
# (after we also group them in a nice `/dev/shm/piker/` subdir).
|
||||||
# ensure we are who we think we are
|
# ensure we are who we think we are
|
||||||
assert 'brokerd' in tractor.current_actor().name
|
servicename = tractor.current_actor().name
|
||||||
|
assert 'brokerd' in servicename
|
||||||
|
assert brokername in servicename
|
||||||
|
|
||||||
bus = get_feed_bus(brokername)
|
bus = get_feed_bus(brokername)
|
||||||
|
|
||||||
|
@ -482,7 +566,7 @@ async def open_feed_bus(
|
||||||
entry = bus.feeds.get(symbol)
|
entry = bus.feeds.get(symbol)
|
||||||
if entry is None:
|
if entry is None:
|
||||||
# allocate a new actor-local stream bus which
|
# allocate a new actor-local stream bus which
|
||||||
# will persist for this `brokerd`.
|
# will persist for this `brokerd`'s service lifetime.
|
||||||
async with bus.task_lock:
|
async with bus.task_lock:
|
||||||
await bus.nursery.start(
|
await bus.nursery.start(
|
||||||
partial(
|
partial(
|
||||||
|
@ -506,12 +590,12 @@ async def open_feed_bus(
|
||||||
init_msg, first_quotes = bus.feeds[symbol]
|
init_msg, first_quotes = bus.feeds[symbol]
|
||||||
|
|
||||||
msg = init_msg[symbol]
|
msg = init_msg[symbol]
|
||||||
bfqsn = msg['fqsn']
|
bfqsn = msg['fqsn'].lower()
|
||||||
|
|
||||||
# true fqsn
|
# true fqsn
|
||||||
fqsn = '.'.join([bfqsn, brokername])
|
fqsn = '.'.join([bfqsn, brokername])
|
||||||
assert fqsn in first_quotes
|
assert fqsn in first_quotes
|
||||||
assert bus.feeds[fqsn]
|
assert bus.feeds[bfqsn]
|
||||||
|
|
||||||
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
|
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
|
||||||
bsym = symbol + f'.{brokername}'
|
bsym = symbol + f'.{brokername}'
|
||||||
|
@ -825,7 +909,10 @@ async def maybe_open_feed(
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> (Feed, ReceiveChannel[dict[str, Any]]):
|
) -> (
|
||||||
|
Feed,
|
||||||
|
ReceiveChannel[dict[str, Any]],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Maybe open a data to a ``brokerd`` daemon only if there is no
|
Maybe open a data to a ``brokerd`` daemon only if there is no
|
||||||
local one for the broker-symbol pair, if one is cached use it wrapped
|
local one for the broker-symbol pair, if one is cached use it wrapped
|
||||||
|
@ -846,6 +933,7 @@ async def maybe_open_feed(
|
||||||
'start_stream': kwargs.get('start_stream', True),
|
'start_stream': kwargs.get('start_stream', True),
|
||||||
},
|
},
|
||||||
key=fqsn,
|
key=fqsn,
|
||||||
|
|
||||||
) as (cache_hit, feed):
|
) as (cache_hit, feed):
|
||||||
|
|
||||||
if cache_hit:
|
if cache_hit:
|
||||||
|
|
|
@ -29,14 +29,13 @@ from typing import (
|
||||||
Any,
|
Any,
|
||||||
Optional,
|
Optional,
|
||||||
Union,
|
Union,
|
||||||
# Callable,
|
|
||||||
# TYPE_CHECKING,
|
|
||||||
)
|
)
|
||||||
import time
|
import time
|
||||||
from math import isnan
|
from math import isnan
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
import msgpack
|
import msgpack
|
||||||
|
import pyqtgraph as pg
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -49,15 +48,8 @@ from anyio_marketstore import (
|
||||||
import purerpc
|
import purerpc
|
||||||
|
|
||||||
from .feed import maybe_open_feed
|
from .feed import maybe_open_feed
|
||||||
from ._source import (
|
|
||||||
mk_fqsn,
|
|
||||||
# Symbol,
|
|
||||||
)
|
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
|
|
||||||
# if TYPE_CHECKING:
|
|
||||||
# from ._sharedmem import ShmArray
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -235,6 +227,16 @@ class Storage:
|
||||||
# series' cache from tsdb reads
|
# series' cache from tsdb reads
|
||||||
self._arrays: dict[str, np.ndarray] = {}
|
self._arrays: dict[str, np.ndarray] = {}
|
||||||
|
|
||||||
|
async def list_keys(self) -> list[str]:
|
||||||
|
return await self.client.list_symbols()
|
||||||
|
|
||||||
|
async def search_keys(self, pattern: str) -> list[str]:
|
||||||
|
'''
|
||||||
|
Search for time series key in the storage backend.
|
||||||
|
|
||||||
|
'''
|
||||||
|
...
|
||||||
|
|
||||||
async def write_ticks(self, ticks: list) -> None:
|
async def write_ticks(self, ticks: list) -> None:
|
||||||
...
|
...
|
||||||
|
|
||||||
|
@ -256,14 +258,29 @@ class Storage:
|
||||||
if fqsn not in syms:
|
if fqsn not in syms:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
tfstr = tf_in_1s[1]
|
||||||
|
|
||||||
|
params = Params(
|
||||||
|
symbols=fqsn,
|
||||||
|
timeframe=tfstr,
|
||||||
|
attrgroup='OHLCV',
|
||||||
|
# limit_from_start=True,
|
||||||
|
|
||||||
|
# TODO: figure the max limit here given the
|
||||||
|
# ``purepc`` msg size limit of purerpc: 33554432
|
||||||
|
limit=int(800e3),
|
||||||
|
)
|
||||||
|
|
||||||
if timeframe is None:
|
if timeframe is None:
|
||||||
log.info(f'starting {fqsn} tsdb granularity scan..')
|
log.info(f'starting {fqsn} tsdb granularity scan..')
|
||||||
# loop through and try to find highest granularity
|
# loop through and try to find highest granularity
|
||||||
for tfstr in tf_in_1s.values():
|
for tfstr in tf_in_1s.values():
|
||||||
try:
|
try:
|
||||||
log.info(f'querying for {tfstr}@{fqsn}')
|
log.info(f'querying for {tfstr}@{fqsn}')
|
||||||
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
|
params.set('timeframe', tfstr)
|
||||||
|
result = await client.query(params)
|
||||||
break
|
break
|
||||||
|
|
||||||
except purerpc.grpclib.exceptions.UnknownError:
|
except purerpc.grpclib.exceptions.UnknownError:
|
||||||
# XXX: this is already logged by the container and
|
# XXX: this is already logged by the container and
|
||||||
# thus shows up through `marketstored` logs relay.
|
# thus shows up through `marketstored` logs relay.
|
||||||
|
@ -273,9 +290,11 @@ class Storage:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
tfstr = tf_in_1s[timeframe]
|
result = await client.query(params)
|
||||||
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
|
|
||||||
|
|
||||||
|
# TODO: it turns out column access on recarrays is actually slower:
|
||||||
|
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
||||||
|
# it might make sense to make these structured arrays?
|
||||||
# Fill out a `numpy` array-results map
|
# Fill out a `numpy` array-results map
|
||||||
arrays = {}
|
arrays = {}
|
||||||
for fqsn, data_set in result.by_symbols().items():
|
for fqsn, data_set in result.by_symbols().items():
|
||||||
|
@ -283,7 +302,22 @@ class Storage:
|
||||||
tf_in_1s.inverse[data_set.timeframe]
|
tf_in_1s.inverse[data_set.timeframe]
|
||||||
] = data_set.array
|
] = data_set.array
|
||||||
|
|
||||||
return arrays[fqsn][timeframe] if timeframe else arrays
|
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
|
||||||
|
|
||||||
|
async def delete_ts(
|
||||||
|
self,
|
||||||
|
key: str,
|
||||||
|
timeframe: Optional[Union[int, str]] = None,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
client = self.client
|
||||||
|
syms = await client.list_symbols()
|
||||||
|
print(syms)
|
||||||
|
# if key not in syms:
|
||||||
|
# raise KeyError(f'`{fqsn}` table key not found?')
|
||||||
|
|
||||||
|
return await client.destroy(tbk=key)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -296,19 +330,16 @@ async def open_storage_client(
|
||||||
Load a series by key and deliver in ``numpy`` struct array format.
|
Load a series by key and deliver in ``numpy`` struct array format.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with get_client() as client:
|
async with (
|
||||||
|
# eventually a storage backend endpoint
|
||||||
storage_client = Storage(client)
|
get_client() as client,
|
||||||
arrays = await storage_client.read_ohlcv(
|
):
|
||||||
fqsn,
|
# slap on our wrapper api
|
||||||
period,
|
yield Storage(client)
|
||||||
)
|
|
||||||
|
|
||||||
yield storage_client, arrays
|
|
||||||
|
|
||||||
|
|
||||||
async def backfill_history_diff(
|
async def tsdb_history_update(
|
||||||
# symbol: Symbol
|
fqsn: str,
|
||||||
|
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
|
|
||||||
|
@ -338,108 +369,95 @@ async def backfill_history_diff(
|
||||||
# * the original data feed arch blurb:
|
# * the original data feed arch blurb:
|
||||||
# - https://github.com/pikers/piker/issues/98
|
# - https://github.com/pikers/piker/issues/98
|
||||||
#
|
#
|
||||||
|
profiler = pg.debug.Profiler(
|
||||||
broker = 'ib'
|
disabled=False, # not pg_profile_enabled(),
|
||||||
symbol = 'mnq.globex'
|
delayed=False,
|
||||||
|
)
|
||||||
# broker = 'binance'
|
|
||||||
# symbol = 'btcusdt'
|
|
||||||
|
|
||||||
fqsn = mk_fqsn(broker, symbol)
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
get_client() as client,
|
open_storage_client(fqsn) as storage,
|
||||||
|
|
||||||
maybe_open_feed(
|
maybe_open_feed(
|
||||||
broker,
|
[fqsn],
|
||||||
[symbol],
|
|
||||||
loglevel='info',
|
|
||||||
# backpressure=False,
|
|
||||||
start_stream=False,
|
start_stream=False,
|
||||||
|
|
||||||
) as (feed, stream),
|
) as (feed, stream),
|
||||||
):
|
):
|
||||||
syms = await client.list_symbols()
|
profiler(f'opened feed for {fqsn}')
|
||||||
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
|
||||||
|
symbol = feed.symbols.get(fqsn)
|
||||||
|
if symbol:
|
||||||
|
fqsn = symbol.front_fqsn()
|
||||||
|
|
||||||
|
syms = await storage.client.list_symbols()
|
||||||
|
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||||
|
profiler(f'listed symbols {syms}')
|
||||||
|
|
||||||
# diff db history with shm and only write the missing portions
|
# diff db history with shm and only write the missing portions
|
||||||
ohlcv = feed.shm.array
|
ohlcv = feed.shm.array
|
||||||
|
|
||||||
key = (fqsn, '1Sec', 'OHLCV')
|
# TODO: use pg profiler
|
||||||
tbk = mk_tbk(key)
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
|
|
||||||
# diff vs. existing array and append new history
|
to_append = feed.shm.array
|
||||||
# TODO:
|
to_prepend = None
|
||||||
|
|
||||||
# TODO: should be no error?
|
# hist diffing
|
||||||
# assert not resp.responses
|
if tsdb_arrays:
|
||||||
|
onesec = tsdb_arrays[1]
|
||||||
|
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
|
||||||
|
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
|
||||||
|
|
||||||
start = time.time()
|
profiler('Finished db arrays diffs')
|
||||||
|
|
||||||
qr = await client.query(
|
for array in [to_append, to_prepend]:
|
||||||
# Params(fqsn, '1Sec`', 'OHLCV',)
|
if array is None:
|
||||||
Params(*key),
|
continue
|
||||||
)
|
|
||||||
# # Dig out `numpy` results map
|
|
||||||
arrays: dict[tuple[str, int], np.ndarray] = {}
|
|
||||||
for name, data_set in qr.by_symbols().items():
|
|
||||||
in_secs = tf_in_1s.inverse[data_set.timeframe]
|
|
||||||
arrays[(name, in_secs)] = data_set.array
|
|
||||||
|
|
||||||
s1 = arrays[(fqsn, 1)]
|
log.info(
|
||||||
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
|
f'Writing datums {array.size} -> to tsdb from shm\n'
|
||||||
|
)
|
||||||
|
|
||||||
end_diff = time.time()
|
# build mkts schema compat array for writing
|
||||||
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
|
mkts_dt = np.dtype(_ohlcv_dt)
|
||||||
|
mkts_array = np.zeros(
|
||||||
|
len(array),
|
||||||
|
dtype=mkts_dt,
|
||||||
|
)
|
||||||
|
# copy from shm array (yes it's this easy):
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
|
||||||
|
mkts_array[:] = array[[
|
||||||
|
'time',
|
||||||
|
'open',
|
||||||
|
'high',
|
||||||
|
'low',
|
||||||
|
'close',
|
||||||
|
'volume',
|
||||||
|
]]
|
||||||
|
|
||||||
log.info(
|
# write to db
|
||||||
f'Appending {to_append.size} datums to tsdb from shm\n'
|
resp = await storage.client.write(
|
||||||
f'Total diff time: {diff_ms} ms'
|
mkts_array,
|
||||||
)
|
tbk=f'{fqsn}/1Sec/OHLCV',
|
||||||
|
|
||||||
# build mkts schema compat array for writing
|
# NOTE: will will append duplicates
|
||||||
mkts_dt = np.dtype(_ohlcv_dt)
|
# for the same timestamp-index.
|
||||||
mkts_array = np.zeros(
|
# TODO: pre deduplicate?
|
||||||
len(to_append),
|
isvariablelength=True,
|
||||||
dtype=mkts_dt,
|
)
|
||||||
)
|
|
||||||
# copy from shm array (yes it's this easy):
|
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
|
|
||||||
mkts_array[:] = to_append[[
|
|
||||||
'time',
|
|
||||||
'open',
|
|
||||||
'high',
|
|
||||||
'low',
|
|
||||||
'close',
|
|
||||||
'volume',
|
|
||||||
]]
|
|
||||||
|
|
||||||
# write to db
|
log.info(
|
||||||
resp = await client.write(
|
f'Wrote {to_append.size} datums to tsdb\n'
|
||||||
mkts_array,
|
)
|
||||||
tbk=tbk,
|
profiler('Finished db writes')
|
||||||
# NOTE: will will append duplicates
|
|
||||||
# for the same timestamp-index.
|
|
||||||
isvariablelength=True,
|
|
||||||
)
|
|
||||||
end_write = time.time()
|
|
||||||
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
|
|
||||||
log.info(
|
|
||||||
f'Wrote {to_append.size} datums to tsdb\n'
|
|
||||||
f'Total write time: {diff_ms} ms'
|
|
||||||
)
|
|
||||||
for resp in resp.responses:
|
|
||||||
err = resp.error
|
|
||||||
if err:
|
|
||||||
raise MarketStoreError(err)
|
|
||||||
|
|
||||||
# TODO: backfiller loop
|
for resp in resp.responses:
|
||||||
from piker.ui._compression import downsample
|
err = resp.error
|
||||||
x, y = downsample(
|
if err:
|
||||||
s1['Epoch'],
|
raise MarketStoreError(err)
|
||||||
s1['Close'],
|
|
||||||
bins=10,
|
from tractor.trionics import ipython_embed
|
||||||
)
|
await ipython_embed()
|
||||||
await tractor.breakpoint()
|
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
|
|
|
@ -54,7 +54,7 @@ for name in win_names:
|
||||||
# disconnect?
|
# disconnect?
|
||||||
for key_combo, timeout in [
|
for key_combo, timeout in [
|
||||||
# only required if we need a connection reset.
|
# only required if we need a connection reset.
|
||||||
('ctrl+alt+r', 12),
|
# ('ctrl+alt+r', 12),
|
||||||
# data feed reset.
|
# data feed reset.
|
||||||
('ctrl+alt+f', 6)
|
('ctrl+alt+f', 6)
|
||||||
]:
|
]:
|
||||||
|
|
Loading…
Reference in New Issue