Drop `ms-shell`, add `piker storesh` cmd

l1_precision_fix
Tyler Goodlet 2022-03-29 13:33:43 -04:00
parent ca48577c60
commit a971de2b67
2 changed files with 12 additions and 33 deletions

View File

@ -126,7 +126,7 @@ def ms_stream(
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def ms_shell(
def storesh(
config,
tl,
host,
@ -137,43 +137,18 @@ def ms_shell(
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import backfill_history_diff
from piker.data.marketstore import tsdb_history_update
from piker._daemon import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'ms_shell',
'storesh',
enable_modules=['piker.data._ahab'],
):
try:
await backfill_history_diff()
except OSError:
# TODO: write magics to query marketstore
sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
symbol = symbols[0]
await tsdb_history_update(symbol)
trio.run(main)
@ -184,10 +159,11 @@ def ms_shell(
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts
brokermods = config['brokermods']
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']

View File

@ -444,6 +444,9 @@ async def tsdb_history_update(
if err:
raise MarketStoreError(err)
from tractor.trionics import ipython_embed
await ipython_embed()
async def ingest_quote_stream(
symbols: list[str],