From cb774e5a5de7ce6e2f4d1f270ac58a11e6efe6ba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 May 2023 17:41:40 -0400 Subject: [PATCH] Re-implement `piker store` CLI with `typer` Turns out you can mix and match `click` with `typer` so this moves what was the `.data.cli` stuff into `storage.cli` and uses the integration api to make it all work B) New subcmd: `piker store` - add `piker store ls` which lists all fqme keyed time-series from backend. - add `store delete` to remove any such key->time-series. - now uses a nursery for multi-timeframe concurrency B) Mask out all the old `marketstore` specific subcmds for now (streaming, ingest, storesh, etc..) in anticipation of moving them into a subpkg-module and make sure to import the sub-cmd module in our top level cli package. Other `.storage` api tweaks: - drop the reraising with custom error (for now). - rename `Storage` -> `StorageClient` (or should it be API?). --- piker/cli/__init__.py | 9 +- piker/storage/__init__.py | 41 ++-- piker/storage/cli.py | 386 ++++++++++++++++++++--------------- piker/storage/marketstore.py | 13 +- 4 files changed, 265 insertions(+), 184 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 70610135..a51fab3a 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -154,6 +154,8 @@ def cli( assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" config._override_config_dir(configdir) + # TODO: for typer see + # https://typer.tiangolo.com/tutorial/commands/context/ ctx.ensure_object(dict) if not brokers: @@ -227,12 +229,15 @@ def services(config, tl, ports): def _load_clis() -> None: from ..service import marketstore # noqa - from ..service import elastic - from ..data import cli # noqa + from ..service import elastic # noqa from ..brokers import cli # noqa from ..ui import cli # noqa from ..watchlists import cli # noqa + # typer implemented + from ..storage import cli # noqa + from ..accounting import cli # noqa + # load downstream cli modules _load_clis() diff --git a/piker/storage/__init__.py b/piker/storage/__init__.py index 3aeefc37..21e258a6 100644 --- a/piker/storage/__init__.py +++ b/piker/storage/__init__.py @@ -61,7 +61,12 @@ get_console_log = partial( ) -class Storage( +__tsdbs__: list[str] = [ + 'marketstore', +] + + +class StorageClient( Protocol, ): ''' @@ -69,6 +74,8 @@ class Storage( in order to suffice the historical data mgmt layer. ''' + name: str + @abstractmethod async def list_keys(self) -> list[str]: ... @@ -131,7 +138,7 @@ class Storage( ... -class StorageConnectionError(ConnectionError): +class StorageConnectionError(ConnectionError): ''' Can't connect to the desired tsdb subsys/service. @@ -152,12 +159,14 @@ def get_storagemod(name: str) -> ModuleType: async def open_storage_client( name: str | None = None, -) -> tuple[ModuleType, Storage]: +) -> tuple[ModuleType, StorageClient]: ''' - Load the ``Storage`` client for named backend. + Load the ``StorageClient`` for named backend. ''' - # load root config for tsdb + tsdb_host: str = 'localhost' + + # load root config and any tsdb user defined settings conf, path = config.load('conf', touch_if_dne=True) net = conf.get('network') if net: @@ -185,17 +194,17 @@ async def open_storage_client( else: log.info(f'Attempting to connect to remote {name}@{tsdbconf}') - try: - async with ( - get_client(**tsdbconf) as client, - ): - # slap on our wrapper api - yield mod, client + # try: + async with ( + get_client(**tsdbconf) as client, + ): + # slap on our wrapper api + yield mod, client - except Exception as err: - raise StorageConnectionError( - f'No connection to {name}' - ) from err + # except Exception as err: + # raise StorageConnectionError( + # f'No connection to {name}' + # ) from err # NOTE: pretty sure right now this is only being @@ -203,7 +212,7 @@ async def open_storage_client( @acm async def open_tsdb_client( fqme: str, -) -> Storage: +) -> StorageClient: # TODO: real-time dedicated task for ensuring # history consistency between the tsdb, shm and real-time feed.. diff --git a/piker/storage/cli.py b/piker/storage/cli.py index f855717b..d2148109 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -18,9 +18,14 @@ marketstore cli. """ +from __future__ import annotations +from typing import TYPE_CHECKING +# import tractor import trio -import tractor -import click +# import click +from rich.console import Console +# from rich.markdown import Markdown +import typer from ..service.marketstore import ( # get_client, @@ -32,35 +37,40 @@ from ..service.marketstore import ( ) from ..cli import cli from .. import watchlists as wl -from ._util import ( +from . import ( log, ) -@cli.command() -@click.option( - '--url', - default='ws://localhost:5993/ws', - help='HTTP URL of marketstore instance' -) -@click.argument('names', nargs=-1) -@click.pass_obj -def ms_stream( - config: dict, - names: list[str], - url: str, -) -> None: - ''' - Connect to a marketstore time bucket stream for (a set of) symbols(s) - and print to console. +if TYPE_CHECKING: + from . import Storage - ''' - async def main(): - # async for quote in stream_quotes(symbols=names): - # log.info(f"Received quote:\n{quote}") - ... +store = typer.Typer() - trio.run(main) +# @cli.command() +# @click.option( +# '--url', +# default='ws://localhost:5993/ws', +# help='HTTP URL of marketstore instance' +# ) +# @click.argument('names', nargs=-1) +# @click.pass_obj +# def ms_stream( +# config: dict, +# names: list[str], +# url: str, +# ) -> None: +# ''' +# Connect to a marketstore time bucket stream for (a set of) symbols(s) +# and print to console. + +# ''' +# async def main(): +# # async for quote in stream_quotes(symbols=names): +# # log.info(f"Received quote:\n{quote}") +# ... + +# trio.run(main) # @cli.command() @@ -99,157 +109,213 @@ def ms_stream( # tractor.run(main) -@cli.command() -@click.option( - '--tsdb_host', - default='localhost' -) -@click.option( - '--tsdb_port', - default=5993 -) -@click.argument('symbols', nargs=-1) -@click.pass_obj -def storesh( - config, - tl, - host, - port, +# @cli.command() +# @click.option( +# '--tsdb_host', +# default='localhost' +# ) +# @click.option( +# '--tsdb_port', +# default=5993 +# ) +# @click.argument('symbols', nargs=-1) +# @click.pass_obj +# def storesh( +# config, +# tl, +# host, +# port, +# symbols: list[str], +# ): +# ''' +# Start an IPython shell ready to query the local marketstore db. + +# ''' +# from piker.storage import open_tsdb_client +# from piker.service import open_piker_runtime + +# async def main(): +# nonlocal symbols + +# async with open_piker_runtime( +# 'storesh', +# enable_modules=['piker.service._ahab'], +# ): +# symbol = symbols[0] + +# async with open_tsdb_client(symbol): +# # TODO: ask if user wants to write history for detected +# # available shm buffers? +# from tractor.trionics import ipython_embed +# await ipython_embed() + +# trio.run(main) + + +@store.command() +def ls( + backends: list[str] = typer.Argument( + default=None, + help='Storage backends to query, default is all.' + ), +): + from piker.service import open_piker_runtime + from . import ( + __tsdbs__, + open_storage_client, + ) + from rich.table import Table + + if not backends: + backends: list[str] = __tsdbs__ + + table = Table(title=f'Table keys for backends {backends}:') + console = Console() + + async def query_all(): + nonlocal backends + + async with ( + open_piker_runtime( + 'tsdb_storage', + enable_modules=['piker.service._ahab'], + ), + ): + for backend in backends: + async with open_storage_client(name=backend) as ( + mod, + client, + ): + table.add_column(f'{mod.name} fqmes') + keys: list[str] = await client.list_keys() + for key in keys: + table.add_row(key) + + console.print(table) + + trio.run(query_all) + + +async def del_ts_by_timeframe( + client: Storage, + fqme: str, + timeframe: int, + +) -> None: + + resp = await client.delete_ts(fqme, timeframe) + + # TODO: encapsulate per backend errors.. + # - MEGA LOL, apparently the symbols don't + # flush out until you refresh something or other + # (maybe the WALFILE)... #lelandorlulzone, classic + # alpaca(Rtm) design here .. + # well, if we ever can make this work we + # probably want to dogsplain the real reason + # for the delete errurz..llululu + # if fqme not in syms: + # log.error(f'Pair {fqme} dne in DB') + msgish = resp.ListFields()[0][1] + if 'error' in str(msgish): + log.error( + f'Deletion error:\n' + f'backend: {client.name}\n' + f'fqme: {fqme}\n' + f'timeframe: {timeframe}s\n' + f'Error msg:\n\n{msgish}\n', + ) + + +@store.command() +def delete( symbols: list[str], + + backend: str = typer.Option( + default=None, + help='Storage backend to update' + ), + + # delete: bool = typer.Option(False, '-d'), + # host: str = typer.Option( + # 'localhost', + # '-h', + # ), + # port: int = typer.Option('5993', '-p'), ): ''' - Start an IPython shell ready to query the local marketstore db. + Delete a storage backend's time series for (table) keys provided as + ``symbols``. ''' - from piker.storage import open_tsdb_client from piker.service import open_piker_runtime + from . import open_storage_client - async def main(): - nonlocal symbols - - async with open_piker_runtime( - 'storesh', - enable_modules=['piker.service._ahab'], + async def main(symbols: list[str]): + async with ( + open_piker_runtime( + 'tsdb_storage', + enable_modules=['piker.service._ahab'] + ), + open_storage_client(name=backend) as (_, storage), + trio.open_nursery() as n, ): - symbol = symbols[0] + # spawn queries as tasks for max conc! + for fqme in symbols: + for tf in [1, 60]: + n.start_soon( + del_ts_by_timeframe, + storage, + fqme, + tf, + ) - async with open_tsdb_client(symbol): - # TODO: ask if user wants to write history for detected - # available shm buffers? - from tractor.trionics import ipython_embed - await ipython_embed() - - trio.run(main) + trio.run(main, symbols) -@cli.command() -@click.option( - '--host', - default='localhost' -) -@click.option( - '--port', - default=5993 -) -@click.option( - '--delete', - '-d', - is_flag=True, - help='Delete history (1 Min) for symbol(s)', -) -@click.argument('symbols', nargs=-1) -@click.pass_obj -def storage( - config, - host, - port, - symbols: list[str], - delete: bool, +typer_click_object = typer.main.get_command(store) +cli.add_command(typer_click_object, 'store') -): - ''' - Start an IPython shell ready to query the local marketstore db. +# @cli.command() +# @click.option('--test-file', '-t', help='Test quote stream file') +# @click.option('--tl', is_flag=True, help='Enable tractor logging') +# @click.argument('name', nargs=1, required=True) +# @click.pass_obj +# def ingest(config, name, test_file, tl): +# ''' +# Ingest real-time broker quotes and ticks to a marketstore instance. - ''' - from piker.storage import open_tsdb_client - from piker.service import open_piker_runtime +# ''' +# # global opts +# loglevel = config['loglevel'] +# tractorloglevel = config['tractorloglevel'] +# # log = config['log'] - async def main(): - nonlocal symbols +# watchlist_from_file = wl.ensure_watchlists(config['wl_path']) +# watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) +# symbols = watchlists[name] - async with open_piker_runtime( - 'tsdb_storage', - enable_modules=['piker.service._ahab'], - ): - symbol = symbols[0] - async with open_tsdb_client(symbol) as storage: - if delete: - for fqme in symbols: - syms = await storage.client.list_symbols() +# grouped_syms = {} +# for sym in symbols: +# symbol, _, provider = sym.rpartition('.') +# if provider not in grouped_syms: +# grouped_syms[provider] = [] - resp60s = await storage.delete_ts(fqme, 60) +# grouped_syms[provider].append(symbol) - msgish = resp60s.ListFields()[0][1] - if 'error' in str(msgish): +# async def entry_point(): +# async with tractor.open_nursery() as n: +# for provider, symbols in grouped_syms.items(): +# await n.run_in_actor( +# ingest_quote_stream, +# name='ingest_marketstore', +# symbols=symbols, +# brokername=provider, +# tries=1, +# actorloglevel=loglevel, +# loglevel=tractorloglevel +# ) - # TODO: MEGA LOL, apparently the symbols don't - # flush out until you refresh something or other - # (maybe the WALFILE)... #lelandorlulzone, classic - # alpaca(Rtm) design here .. - # well, if we ever can make this work we - # probably want to dogsplain the real reason - # for the delete errurz..llululu - if fqme not in syms: - log.error(f'Pair {fqme} dne in DB') +# tractor.run(entry_point) - log.error(f'Deletion error: {fqme}\n{msgish}') - - resp1s = await storage.delete_ts(fqme, 1) - msgish = resp1s.ListFields()[0][1] - if 'error' in str(msgish): - log.error(f'Deletion error: {fqme}\n{msgish}') - - trio.run(main) - - -@cli.command() -@click.option('--test-file', '-t', help='Test quote stream file') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.argument('name', nargs=1, required=True) -@click.pass_obj -def ingest(config, name, test_file, tl): - ''' - Ingest real-time broker quotes and ticks to a marketstore instance. - - ''' - # global opts - loglevel = config['loglevel'] - tractorloglevel = config['tractorloglevel'] - # log = config['log'] - - watchlist_from_file = wl.ensure_watchlists(config['wl_path']) - watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - symbols = watchlists[name] - - grouped_syms = {} - for sym in symbols: - symbol, _, provider = sym.rpartition('.') - if provider not in grouped_syms: - grouped_syms[provider] = [] - - grouped_syms[provider].append(symbol) - - async def entry_point(): - async with tractor.open_nursery() as n: - for provider, symbols in grouped_syms.items(): - await n.run_in_actor( - ingest_quote_stream, - name='ingest_marketstore', - symbols=symbols, - brokername=provider, - tries=1, - actorloglevel=loglevel, - loglevel=tractorloglevel - ) - - tractor.run(entry_point) +# if __name__ == "__main__": +# store() # this is called from ``>> ledger `` diff --git a/piker/storage/marketstore.py b/piker/storage/marketstore.py index 9aad2230..d1a3d67f 100644 --- a/piker/storage/marketstore.py +++ b/piker/storage/marketstore.py @@ -65,11 +65,13 @@ from ..log import get_logger log = get_logger(__name__) -class Storage: +class MktsStorageClient: ''' High level storage api for both real-time and historical ingest. ''' + name: str = 'marketstore' + def __init__( self, client: MarketstoreClient, @@ -214,10 +216,9 @@ class Storage: ) -> bool: client = self.client - syms = await client.list_symbols() - if key not in syms: - await tractor.breakpoint() - raise KeyError(f'`{key}` table key not found in\n{syms}?') + # syms = await client.list_symbols() + # if key not in syms: + # raise KeyError(f'`{key}` table key not found in\n{syms}?') tbk = mk_tbk(( key, @@ -339,4 +340,4 @@ async def get_client( host or 'localhost', grpc_port, ) as client: - yield Storage(client) + yield MktsStorageClient(client)