Re-implement `piker store` CLI with `typer`

Turns out you can mix and match `click` with `typer` so this moves what
was the `.data.cli` stuff into `storage.cli` and uses the integration
api to make it all work B)

New subcmd: `piker store`
- add `piker store ls` which lists all fqme keyed time-series from backend.
- add `store delete` to remove any such key->time-series.
  - now uses a nursery for multi-timeframe concurrency B)

Mask out all the old `marketstore` specific subcmds for now (streaming,
ingest, storesh, etc..) in anticipation of moving them into
a subpkg-module and make sure to import the sub-cmd module in our top
level cli package.

Other `.storage` api tweaks:
- drop the reraising with custom error (for now).
- rename `Storage` -> `StorageClient` (or should it be API?).
basic_buy_bot
Tyler Goodlet 2023-05-29 17:41:40 -04:00
parent 1ec9b0565f
commit cb774e5a5d
4 changed files with 265 additions and 184 deletions

View File

@ -154,6 +154,8 @@ def cli(
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir) config._override_config_dir(configdir)
# TODO: for typer see
# https://typer.tiangolo.com/tutorial/commands/context/
ctx.ensure_object(dict) ctx.ensure_object(dict)
if not brokers: if not brokers:
@ -227,12 +229,15 @@ def services(config, tl, ports):
def _load_clis() -> None: def _load_clis() -> None:
from ..service import marketstore # noqa from ..service import marketstore # noqa
from ..service import elastic from ..service import elastic # noqa
from ..data import cli # noqa
from ..brokers import cli # noqa from ..brokers import cli # noqa
from ..ui import cli # noqa from ..ui import cli # noqa
from ..watchlists import cli # noqa from ..watchlists import cli # noqa
# typer implemented
from ..storage import cli # noqa
from ..accounting import cli # noqa
# load downstream cli modules # load downstream cli modules
_load_clis() _load_clis()

View File

@ -61,7 +61,12 @@ get_console_log = partial(
) )
class Storage( __tsdbs__: list[str] = [
'marketstore',
]
class StorageClient(
Protocol, Protocol,
): ):
''' '''
@ -69,6 +74,8 @@ class Storage(
in order to suffice the historical data mgmt layer. in order to suffice the historical data mgmt layer.
''' '''
name: str
@abstractmethod @abstractmethod
async def list_keys(self) -> list[str]: async def list_keys(self) -> list[str]:
... ...
@ -152,12 +159,14 @@ def get_storagemod(name: str) -> ModuleType:
async def open_storage_client( async def open_storage_client(
name: str | None = None, name: str | None = None,
) -> tuple[ModuleType, Storage]: ) -> tuple[ModuleType, StorageClient]:
''' '''
Load the ``Storage`` client for named backend. Load the ``StorageClient`` for named backend.
''' '''
# load root config for tsdb tsdb_host: str = 'localhost'
# load root config and any tsdb user defined settings
conf, path = config.load('conf', touch_if_dne=True) conf, path = config.load('conf', touch_if_dne=True)
net = conf.get('network') net = conf.get('network')
if net: if net:
@ -185,17 +194,17 @@ async def open_storage_client(
else: else:
log.info(f'Attempting to connect to remote {name}@{tsdbconf}') log.info(f'Attempting to connect to remote {name}@{tsdbconf}')
try: # try:
async with ( async with (
get_client(**tsdbconf) as client, get_client(**tsdbconf) as client,
): ):
# slap on our wrapper api # slap on our wrapper api
yield mod, client yield mod, client
except Exception as err: # except Exception as err:
raise StorageConnectionError( # raise StorageConnectionError(
f'No connection to {name}' # f'No connection to {name}'
) from err # ) from err
# NOTE: pretty sure right now this is only being # NOTE: pretty sure right now this is only being
@ -203,7 +212,7 @@ async def open_storage_client(
@acm @acm
async def open_tsdb_client( async def open_tsdb_client(
fqme: str, fqme: str,
) -> Storage: ) -> StorageClient:
# TODO: real-time dedicated task for ensuring # TODO: real-time dedicated task for ensuring
# history consistency between the tsdb, shm and real-time feed.. # history consistency between the tsdb, shm and real-time feed..

View File

@ -18,9 +18,14 @@
marketstore cli. marketstore cli.
""" """
from __future__ import annotations
from typing import TYPE_CHECKING
# import tractor
import trio import trio
import tractor # import click
import click from rich.console import Console
# from rich.markdown import Markdown
import typer
from ..service.marketstore import ( from ..service.marketstore import (
# get_client, # get_client,
@ -32,35 +37,40 @@ from ..service.marketstore import (
) )
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ._util import ( from . import (
log, log,
) )
@cli.command() if TYPE_CHECKING:
@click.option( from . import Storage
'--url',
default='ws://localhost:5993/ws',
help='HTTP URL of marketstore instance'
)
@click.argument('names', nargs=-1)
@click.pass_obj
def ms_stream(
config: dict,
names: list[str],
url: str,
) -> None:
'''
Connect to a marketstore time bucket stream for (a set of) symbols(s)
and print to console.
''' store = typer.Typer()
async def main():
# async for quote in stream_quotes(symbols=names):
# log.info(f"Received quote:\n{quote}")
...
trio.run(main) # @cli.command()
# @click.option(
# '--url',
# default='ws://localhost:5993/ws',
# help='HTTP URL of marketstore instance'
# )
# @click.argument('names', nargs=-1)
# @click.pass_obj
# def ms_stream(
# config: dict,
# names: list[str],
# url: str,
# ) -> None:
# '''
# Connect to a marketstore time bucket stream for (a set of) symbols(s)
# and print to console.
# '''
# async def main():
# # async for quote in stream_quotes(symbols=names):
# # log.info(f"Received quote:\n{quote}")
# ...
# trio.run(main)
# @cli.command() # @cli.command()
@ -99,157 +109,213 @@ def ms_stream(
# tractor.run(main) # tractor.run(main)
@cli.command() # @cli.command()
@click.option( # @click.option(
'--tsdb_host', # '--tsdb_host',
default='localhost' # default='localhost'
) # )
@click.option( # @click.option(
'--tsdb_port', # '--tsdb_port',
default=5993 # default=5993
) # )
@click.argument('symbols', nargs=-1) # @click.argument('symbols', nargs=-1)
@click.pass_obj # @click.pass_obj
def storesh( # def storesh(
config, # config,
tl, # tl,
host, # host,
port, # port,
symbols: list[str], # symbols: list[str],
): # ):
''' # '''
Start an IPython shell ready to query the local marketstore db. # Start an IPython shell ready to query the local marketstore db.
''' # '''
from piker.storage import open_tsdb_client # from piker.storage import open_tsdb_client
# from piker.service import open_piker_runtime
# async def main():
# nonlocal symbols
# async with open_piker_runtime(
# 'storesh',
# enable_modules=['piker.service._ahab'],
# ):
# symbol = symbols[0]
# async with open_tsdb_client(symbol):
# # TODO: ask if user wants to write history for detected
# # available shm buffers?
# from tractor.trionics import ipython_embed
# await ipython_embed()
# trio.run(main)
@store.command()
def ls(
backends: list[str] = typer.Argument(
default=None,
help='Storage backends to query, default is all.'
),
):
from piker.service import open_piker_runtime from piker.service import open_piker_runtime
from . import (
async def main(): __tsdbs__,
nonlocal symbols open_storage_client,
async with open_piker_runtime(
'storesh',
enable_modules=['piker.service._ahab'],
):
symbol = symbols[0]
async with open_tsdb_client(symbol):
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
trio.run(main)
@cli.command()
@click.option(
'--host',
default='localhost'
) )
@click.option( from rich.table import Table
'--port',
default=5993
)
@click.option(
'--delete',
'-d',
is_flag=True,
help='Delete history (1 Min) for symbol(s)',
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def storage(
config,
host,
port,
symbols: list[str],
delete: bool,
): if not backends:
''' backends: list[str] = __tsdbs__
Start an IPython shell ready to query the local marketstore db.
''' table = Table(title=f'Table keys for backends {backends}:')
from piker.storage import open_tsdb_client console = Console()
from piker.service import open_piker_runtime
async def main(): async def query_all():
nonlocal symbols nonlocal backends
async with open_piker_runtime( async with (
open_piker_runtime(
'tsdb_storage', 'tsdb_storage',
enable_modules=['piker.service._ahab'], enable_modules=['piker.service._ahab'],
),
): ):
symbol = symbols[0] for backend in backends:
async with open_tsdb_client(symbol) as storage: async with open_storage_client(name=backend) as (
if delete: mod,
for fqme in symbols: client,
syms = await storage.client.list_symbols() ):
table.add_column(f'{mod.name} fqmes')
keys: list[str] = await client.list_keys()
for key in keys:
table.add_row(key)
resp60s = await storage.delete_ts(fqme, 60) console.print(table)
msgish = resp60s.ListFields()[0][1] trio.run(query_all)
if 'error' in str(msgish):
# TODO: MEGA LOL, apparently the symbols don't
async def del_ts_by_timeframe(
client: Storage,
fqme: str,
timeframe: int,
) -> None:
resp = await client.delete_ts(fqme, timeframe)
# TODO: encapsulate per backend errors..
# - MEGA LOL, apparently the symbols don't
# flush out until you refresh something or other # flush out until you refresh something or other
# (maybe the WALFILE)... #lelandorlulzone, classic # (maybe the WALFILE)... #lelandorlulzone, classic
# alpaca(Rtm) design here .. # alpaca(Rtm) design here ..
# well, if we ever can make this work we # well, if we ever can make this work we
# probably want to dogsplain the real reason # probably want to dogsplain the real reason
# for the delete errurz..llululu # for the delete errurz..llululu
if fqme not in syms: # if fqme not in syms:
log.error(f'Pair {fqme} dne in DB') # log.error(f'Pair {fqme} dne in DB')
msgish = resp.ListFields()[0][1]
log.error(f'Deletion error: {fqme}\n{msgish}')
resp1s = await storage.delete_ts(fqme, 1)
msgish = resp1s.ListFields()[0][1]
if 'error' in str(msgish): if 'error' in str(msgish):
log.error(f'Deletion error: {fqme}\n{msgish}') log.error(
f'Deletion error:\n'
trio.run(main) f'backend: {client.name}\n'
f'fqme: {fqme}\n'
f'timeframe: {timeframe}s\n'
@cli.command() f'Error msg:\n\n{msgish}\n',
@click.option('--test-file', '-t', help='Test quote stream file')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl):
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']
watchlist_from_file = wl.ensure_watchlists(config['wl_path'])
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name]
grouped_syms = {}
for sym in symbols:
symbol, _, provider = sym.rpartition('.')
if provider not in grouped_syms:
grouped_syms[provider] = []
grouped_syms[provider].append(symbol)
async def entry_point():
async with tractor.open_nursery() as n:
for provider, symbols in grouped_syms.items():
await n.run_in_actor(
ingest_quote_stream,
name='ingest_marketstore',
symbols=symbols,
brokername=provider,
tries=1,
actorloglevel=loglevel,
loglevel=tractorloglevel
) )
tractor.run(entry_point)
@store.command()
def delete(
symbols: list[str],
backend: str = typer.Option(
default=None,
help='Storage backend to update'
),
# delete: bool = typer.Option(False, '-d'),
# host: str = typer.Option(
# 'localhost',
# '-h',
# ),
# port: int = typer.Option('5993', '-p'),
):
'''
Delete a storage backend's time series for (table) keys provided as
``symbols``.
'''
from piker.service import open_piker_runtime
from . import open_storage_client
async def main(symbols: list[str]):
async with (
open_piker_runtime(
'tsdb_storage',
enable_modules=['piker.service._ahab']
),
open_storage_client(name=backend) as (_, storage),
trio.open_nursery() as n,
):
# spawn queries as tasks for max conc!
for fqme in symbols:
for tf in [1, 60]:
n.start_soon(
del_ts_by_timeframe,
storage,
fqme,
tf,
)
trio.run(main, symbols)
typer_click_object = typer.main.get_command(store)
cli.add_command(typer_click_object, 'store')
# @cli.command()
# @click.option('--test-file', '-t', help='Test quote stream file')
# @click.option('--tl', is_flag=True, help='Enable tractor logging')
# @click.argument('name', nargs=1, required=True)
# @click.pass_obj
# def ingest(config, name, test_file, tl):
# '''
# Ingest real-time broker quotes and ticks to a marketstore instance.
# '''
# # global opts
# loglevel = config['loglevel']
# tractorloglevel = config['tractorloglevel']
# # log = config['log']
# watchlist_from_file = wl.ensure_watchlists(config['wl_path'])
# watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
# symbols = watchlists[name]
# grouped_syms = {}
# for sym in symbols:
# symbol, _, provider = sym.rpartition('.')
# if provider not in grouped_syms:
# grouped_syms[provider] = []
# grouped_syms[provider].append(symbol)
# async def entry_point():
# async with tractor.open_nursery() as n:
# for provider, symbols in grouped_syms.items():
# await n.run_in_actor(
# ingest_quote_stream,
# name='ingest_marketstore',
# symbols=symbols,
# brokername=provider,
# tries=1,
# actorloglevel=loglevel,
# loglevel=tractorloglevel
# )
# tractor.run(entry_point)
# if __name__ == "__main__":
# store() # this is called from ``>> ledger <accountname>``

View File

@ -65,11 +65,13 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
class Storage: class MktsStorageClient:
''' '''
High level storage api for both real-time and historical ingest. High level storage api for both real-time and historical ingest.
''' '''
name: str = 'marketstore'
def __init__( def __init__(
self, self,
client: MarketstoreClient, client: MarketstoreClient,
@ -214,10 +216,9 @@ class Storage:
) -> bool: ) -> bool:
client = self.client client = self.client
syms = await client.list_symbols() # syms = await client.list_symbols()
if key not in syms: # if key not in syms:
await tractor.breakpoint() # raise KeyError(f'`{key}` table key not found in\n{syms}?')
raise KeyError(f'`{key}` table key not found in\n{syms}?')
tbk = mk_tbk(( tbk = mk_tbk((
key, key,
@ -339,4 +340,4 @@ async def get_client(
host or 'localhost', host or 'localhost',
grpc_port, grpc_port,
) as client: ) as client:
yield Storage(client) yield MktsStorageClient(client)