First draft storage layer cli

Adds a `piker storage` subcmd with a `-d` flag to wipe a particular
fqsn's time series (both 1s and 60s). Obviously this needs to be
extended much more but provides a start point.
service_subpkg
Tyler Goodlet 2023-01-29 15:17:39 -05:00
parent dae8e59d26
commit fe0695fb7b
2 changed files with 73 additions and 23 deletions

View File

@ -19,7 +19,10 @@ marketstore cli.
"""
from functools import partial
from pprint import pformat
from pprint import (
pformat,
pprint,
)
from anyio_marketstore import open_marketstore_client
import trio
@ -113,15 +116,11 @@ def ms_stream(
@cli.command()
@click.option(
'--tl',
is_flag=True,
help='Enable tractor logging')
@click.option(
'--host',
'--tsdb_host',
default='localhost'
)
@click.option(
'--port',
'--tsdb_port',
default=5993
)
@click.argument('symbols', nargs=-1)
@ -137,7 +136,7 @@ def storesh(
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import tsdb_history_update
from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime
async def main():
@ -148,7 +147,63 @@ def storesh(
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update(symbol)
async with open_tsdb_client(symbol) as storage:
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
trio.run(main)
@cli.command()
@click.option(
'--host',
default='localhost'
)
@click.option(
'--port',
default=5993
)
@click.option(
'--delete',
'-d',
is_flag=True,
help='Delete history (1 Min) for symbol(s)',
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def storage(
config,
host,
port,
symbols: list[str],
delete: bool,
):
'''
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'tsdb_storage',
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
async with open_tsdb_client(symbol) as storage:
if delete:
for fqsn in symbols:
syms = await storage.client.list_symbols()
breakpoint()
await storage.delete_ts(fqsn, 60)
await storage.delete_ts(fqsn, 1)
trio.run(main)

View File

@ -510,7 +510,6 @@ class Storage:
client = self.client
syms = await client.list_symbols()
print(syms)
if key not in syms:
raise KeyError(f'`{key}` table key not found in\n{syms}?')
@ -627,10 +626,10 @@ async def open_storage_client(
yield Storage(client)
async def tsdb_history_update(
fqsn: Optional[str] = None,
) -> list[str]:
@acm
async def open_tsdb_client(
fqsn: str,
) -> Storage:
# TODO: real-time dedicated task for ensuring
# history consistency between the tsdb, shm and real-time feed..
@ -659,7 +658,7 @@ async def tsdb_history_update(
# - https://github.com/pikers/piker/issues/98
#
profiler = Profiler(
disabled=False, # not pg_profile_enabled(),
disabled=True, # not pg_profile_enabled(),
delayed=False,
)
@ -700,14 +699,10 @@ async def tsdb_history_update(
# profiler('Finished db arrays diffs')
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
syms = await storage.client.list_symbols()
# log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
# profiler(f'listed symbols {syms}')
yield storage
# for array in [to_append, to_prepend]:
# if array is None: