First draft storage layer cli
Adds a `piker storage` subcmd with a `-d` flag to wipe a particular fqsn's time series (both 1s and 60s). Obviously this needs to be extended much more but provides a start point.storage_cli
parent
4379bfe760
commit
58f39d1829
|
@ -19,7 +19,10 @@ marketstore cli.
|
|||
|
||||
"""
|
||||
from functools import partial
|
||||
from pprint import pformat
|
||||
from pprint import (
|
||||
pformat,
|
||||
pprint,
|
||||
)
|
||||
|
||||
from anyio_marketstore import open_marketstore_client
|
||||
import trio
|
||||
|
@ -113,15 +116,11 @@ def ms_stream(
|
|||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
'--tl',
|
||||
is_flag=True,
|
||||
help='Enable tractor logging')
|
||||
@click.option(
|
||||
'--host',
|
||||
'--tsdb_host',
|
||||
default='localhost'
|
||||
)
|
||||
@click.option(
|
||||
'--port',
|
||||
'--tsdb_port',
|
||||
default=5993
|
||||
)
|
||||
@click.argument('symbols', nargs=-1)
|
||||
|
@ -137,7 +136,7 @@ def storesh(
|
|||
Start an IPython shell ready to query the local marketstore db.
|
||||
|
||||
'''
|
||||
from piker.data.marketstore import tsdb_history_update
|
||||
from piker.data.marketstore import open_tsdb_client
|
||||
from piker._daemon import open_piker_runtime
|
||||
|
||||
async def main():
|
||||
|
@ -148,7 +147,63 @@ def storesh(
|
|||
enable_modules=['piker.data._ahab'],
|
||||
):
|
||||
symbol = symbols[0]
|
||||
await tsdb_history_update(symbol)
|
||||
|
||||
async with open_tsdb_client(symbol) as storage:
|
||||
# TODO: ask if user wants to write history for detected
|
||||
# available shm buffers?
|
||||
from tractor.trionics import ipython_embed
|
||||
await ipython_embed()
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
'--host',
|
||||
default='localhost'
|
||||
)
|
||||
@click.option(
|
||||
'--port',
|
||||
default=5993
|
||||
)
|
||||
@click.option(
|
||||
'--delete',
|
||||
'-d',
|
||||
is_flag=True,
|
||||
help='Delete history (1 Min) for symbol(s)',
|
||||
)
|
||||
@click.argument('symbols', nargs=-1)
|
||||
@click.pass_obj
|
||||
def storage(
|
||||
config,
|
||||
host,
|
||||
port,
|
||||
symbols: list[str],
|
||||
delete: bool,
|
||||
|
||||
):
|
||||
'''
|
||||
Start an IPython shell ready to query the local marketstore db.
|
||||
|
||||
'''
|
||||
from piker.data.marketstore import open_tsdb_client
|
||||
from piker._daemon import open_piker_runtime
|
||||
|
||||
async def main():
|
||||
nonlocal symbols
|
||||
|
||||
async with open_piker_runtime(
|
||||
'tsdb_storage',
|
||||
enable_modules=['piker.data._ahab'],
|
||||
):
|
||||
symbol = symbols[0]
|
||||
async with open_tsdb_client(symbol) as storage:
|
||||
if delete:
|
||||
for fqsn in symbols:
|
||||
syms = await storage.client.list_symbols()
|
||||
breakpoint()
|
||||
await storage.delete_ts(fqsn, 60)
|
||||
await storage.delete_ts(fqsn, 1)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
|
|
@ -510,7 +510,6 @@ class Storage:
|
|||
|
||||
client = self.client
|
||||
syms = await client.list_symbols()
|
||||
print(syms)
|
||||
if key not in syms:
|
||||
raise KeyError(f'`{key}` table key not found in\n{syms}?')
|
||||
|
||||
|
@ -627,10 +626,10 @@ async def open_storage_client(
|
|||
yield Storage(client)
|
||||
|
||||
|
||||
async def tsdb_history_update(
|
||||
fqsn: Optional[str] = None,
|
||||
|
||||
) -> list[str]:
|
||||
@acm
|
||||
async def open_tsdb_client(
|
||||
fqsn: str,
|
||||
) -> Storage:
|
||||
|
||||
# TODO: real-time dedicated task for ensuring
|
||||
# history consistency between the tsdb, shm and real-time feed..
|
||||
|
@ -659,7 +658,7 @@ async def tsdb_history_update(
|
|||
# - https://github.com/pikers/piker/issues/98
|
||||
#
|
||||
profiler = Profiler(
|
||||
disabled=False, # not pg_profile_enabled(),
|
||||
disabled=True, # not pg_profile_enabled(),
|
||||
delayed=False,
|
||||
)
|
||||
|
||||
|
@ -701,13 +700,9 @@ async def tsdb_history_update(
|
|||
# profiler('Finished db arrays diffs')
|
||||
|
||||
syms = await storage.client.list_symbols()
|
||||
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||
profiler(f'listed symbols {syms}')
|
||||
|
||||
# TODO: ask if user wants to write history for detected
|
||||
# available shm buffers?
|
||||
from tractor.trionics import ipython_embed
|
||||
await ipython_embed()
|
||||
# log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||
# profiler(f'listed symbols {syms}')
|
||||
yield storage
|
||||
|
||||
# for array in [to_append, to_prepend]:
|
||||
# if array is None:
|
||||
|
|
Loading…
Reference in New Issue