From 58f39d182979cc17c5c861e29b69144d102cd21a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jan 2023 15:17:39 -0500 Subject: [PATCH] First draft storage layer cli Adds a `piker storage` subcmd with a `-d` flag to wipe a particular fqsn's time series (both 1s and 60s). Obviously this needs to be extended much more but provides a start point. --- piker/data/cli.py | 73 ++++++++++++++++++++++++++++++++++----- piker/data/marketstore.py | 23 +++++------- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index 554048a4..994b9da4 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -19,7 +19,10 @@ marketstore cli. """ from functools import partial -from pprint import pformat +from pprint import ( + pformat, + pprint, +) from anyio_marketstore import open_marketstore_client import trio @@ -113,15 +116,11 @@ def ms_stream( @cli.command() @click.option( - '--tl', - is_flag=True, - help='Enable tractor logging') -@click.option( - '--host', + '--tsdb_host', default='localhost' ) @click.option( - '--port', + '--tsdb_port', default=5993 ) @click.argument('symbols', nargs=-1) @@ -137,7 +136,7 @@ def storesh( Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import tsdb_history_update + from piker.data.marketstore import open_tsdb_client from piker._daemon import open_piker_runtime async def main(): @@ -148,7 +147,63 @@ def storesh( enable_modules=['piker.data._ahab'], ): symbol = symbols[0] - await tsdb_history_update(symbol) + + async with open_tsdb_client(symbol) as storage: + # TODO: ask if user wants to write history for detected + # available shm buffers? + from tractor.trionics import ipython_embed + await ipython_embed() + + trio.run(main) + + +@cli.command() +@click.option( + '--host', + default='localhost' +) +@click.option( + '--port', + default=5993 +) +@click.option( + '--delete', + '-d', + is_flag=True, + help='Delete history (1 Min) for symbol(s)', +) +@click.argument('symbols', nargs=-1) +@click.pass_obj +def storage( + config, + host, + port, + symbols: list[str], + delete: bool, + +): + ''' + Start an IPython shell ready to query the local marketstore db. + + ''' + from piker.data.marketstore import open_tsdb_client + from piker._daemon import open_piker_runtime + + async def main(): + nonlocal symbols + + async with open_piker_runtime( + 'tsdb_storage', + enable_modules=['piker.data._ahab'], + ): + symbol = symbols[0] + async with open_tsdb_client(symbol) as storage: + if delete: + for fqsn in symbols: + syms = await storage.client.list_symbols() + breakpoint() + await storage.delete_ts(fqsn, 60) + await storage.delete_ts(fqsn, 1) trio.run(main) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 190667d6..792396e3 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -510,7 +510,6 @@ class Storage: client = self.client syms = await client.list_symbols() - print(syms) if key not in syms: raise KeyError(f'`{key}` table key not found in\n{syms}?') @@ -627,10 +626,10 @@ async def open_storage_client( yield Storage(client) -async def tsdb_history_update( - fqsn: Optional[str] = None, - -) -> list[str]: +@acm +async def open_tsdb_client( + fqsn: str, +) -> Storage: # TODO: real-time dedicated task for ensuring # history consistency between the tsdb, shm and real-time feed.. @@ -659,7 +658,7 @@ async def tsdb_history_update( # - https://github.com/pikers/piker/issues/98 # profiler = Profiler( - disabled=False, # not pg_profile_enabled(), + disabled=True, # not pg_profile_enabled(), delayed=False, ) @@ -700,14 +699,10 @@ async def tsdb_history_update( # profiler('Finished db arrays diffs') - syms = await storage.client.list_symbols() - log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') - profiler(f'listed symbols {syms}') - - # TODO: ask if user wants to write history for detected - # available shm buffers? - from tractor.trionics import ipython_embed - await ipython_embed() + syms = await storage.client.list_symbols() + # log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') + # profiler(f'listed symbols {syms}') + yield storage # for array in [to_append, to_prepend]: # if array is None: