Drop `ms-shell`, add `piker storesh` cmd
parent
edcac1e768
commit
85e2602d2e
|
@ -126,7 +126,7 @@ def ms_stream(
|
||||||
)
|
)
|
||||||
@click.argument('symbols', nargs=-1)
|
@click.argument('symbols', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ms_shell(
|
def storesh(
|
||||||
config,
|
config,
|
||||||
tl,
|
tl,
|
||||||
host,
|
host,
|
||||||
|
@ -137,43 +137,18 @@ def ms_shell(
|
||||||
Start an IPython shell ready to query the local marketstore db.
|
Start an IPython shell ready to query the local marketstore db.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from piker.data.marketstore import backfill_history_diff
|
from piker.data.marketstore import tsdb_history_update
|
||||||
from piker._daemon import open_piker_runtime
|
from piker._daemon import open_piker_runtime
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
nonlocal symbols
|
nonlocal symbols
|
||||||
|
|
||||||
async with open_piker_runtime(
|
async with open_piker_runtime(
|
||||||
'ms_shell',
|
'storesh',
|
||||||
enable_modules=['piker.data._ahab'],
|
enable_modules=['piker.data._ahab'],
|
||||||
):
|
):
|
||||||
try:
|
symbol = symbols[0]
|
||||||
await backfill_history_diff()
|
await tsdb_history_update(symbol)
|
||||||
except OSError:
|
|
||||||
# TODO: write magics to query marketstore
|
|
||||||
|
|
||||||
sym = symbols[0]
|
|
||||||
symbol, _, broker = sym.rpartition('.')
|
|
||||||
# (maybe) allocate shm array for this broker/symbol which will
|
|
||||||
# be used for fast near-term history capture and processing.
|
|
||||||
shm, opened = maybe_open_shm_array(
|
|
||||||
key=sym,
|
|
||||||
dtype=base_iohlc_dtype,
|
|
||||||
)
|
|
||||||
# load anything found in shm
|
|
||||||
from numpy.lib.recfunctions import structured_to_unstructured
|
|
||||||
mxmn = structured_to_unstructured(
|
|
||||||
shm.array[['low', 'high']],
|
|
||||||
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
|
|
||||||
).flatten()
|
|
||||||
|
|
||||||
from piker.ui._compression import downsample
|
|
||||||
xd, yd = downsample(
|
|
||||||
y=mxmn,
|
|
||||||
x=np.arange(len(mxmn)),
|
|
||||||
bins=4,
|
|
||||||
)
|
|
||||||
await tractor.breakpoint()
|
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -184,10 +159,11 @@ def ms_shell(
|
||||||
@click.argument('name', nargs=1, required=True)
|
@click.argument('name', nargs=1, required=True)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ingest(config, name, test_file, tl):
|
def ingest(config, name, test_file, tl):
|
||||||
"""Ingest real-time broker quotes and ticks to a marketstore instance.
|
'''
|
||||||
"""
|
Ingest real-time broker quotes and ticks to a marketstore instance.
|
||||||
|
|
||||||
|
'''
|
||||||
# global opts
|
# global opts
|
||||||
brokermods = config['brokermods']
|
|
||||||
loglevel = config['loglevel']
|
loglevel = config['loglevel']
|
||||||
tractorloglevel = config['tractorloglevel']
|
tractorloglevel = config['tractorloglevel']
|
||||||
# log = config['log']
|
# log = config['log']
|
||||||
|
|
|
@ -444,6 +444,9 @@ async def tsdb_history_update(
|
||||||
if err:
|
if err:
|
||||||
raise MarketStoreError(err)
|
raise MarketStoreError(err)
|
||||||
|
|
||||||
|
from tractor.trionics import ipython_embed
|
||||||
|
await ipython_embed()
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
|
|
Loading…
Reference in New Issue