From e83de2906f92b99f80cba78075bff0121ec0a9a4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 May 2023 20:11:57 -0400 Subject: [PATCH] Relegate old marketstore cli eps to masked module --- piker/storage/cli.py | 164 +--------------- .../__init__.py} | 0 piker/storage/marketstore/_ingest.py | 177 ++++++++++++++++++ 3 files changed, 179 insertions(+), 162 deletions(-) rename piker/storage/{marketstore.py => marketstore/__init__.py} (100%) create mode 100644 piker/storage/marketstore/_ingest.py diff --git a/piker/storage/cli.py b/piker/storage/cli.py index d2148109..352db2cd 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -15,142 +15,27 @@ # along with this program. If not, see . """ -marketstore cli. +Storage middle-ware CLIs. """ from __future__ import annotations from typing import TYPE_CHECKING -# import tractor import trio -# import click from rich.console import Console # from rich.markdown import Markdown import typer -from ..service.marketstore import ( - # get_client, - # stream_quotes, - ingest_quote_stream, - # _url, - # _tick_tbk_ids, - # mk_tbk, -) from ..cli import cli -from .. import watchlists as wl from . import ( log, ) - if TYPE_CHECKING: from . import Storage + store = typer.Typer() -# @cli.command() -# @click.option( -# '--url', -# default='ws://localhost:5993/ws', -# help='HTTP URL of marketstore instance' -# ) -# @click.argument('names', nargs=-1) -# @click.pass_obj -# def ms_stream( -# config: dict, -# names: list[str], -# url: str, -# ) -> None: -# ''' -# Connect to a marketstore time bucket stream for (a set of) symbols(s) -# and print to console. - -# ''' -# async def main(): -# # async for quote in stream_quotes(symbols=names): -# # log.info(f"Received quote:\n{quote}") -# ... - -# trio.run(main) - - -# @cli.command() -# @click.option( -# '--url', -# default=_url, -# help='HTTP URL of marketstore instance' -# ) -# @click.argument('names', nargs=-1) -# @click.pass_obj -# def ms_destroy(config: dict, names: list[str], url: str) -> None: -# """Destroy symbol entries in the local marketstore instance. -# """ -# async def main(): -# nonlocal names -# async with get_client(url) as client: -# -# if not names: -# names = await client.list_symbols() -# -# # default is to wipe db entirely. -# answer = input( -# "This will entirely wipe you local marketstore db @ " -# f"{url} of the following symbols:\n {pformat(names)}" -# "\n\nDelete [N/y]?\n") -# -# if answer == 'y': -# for sym in names: -# # tbk = _tick_tbk.format(sym) -# tbk = tuple(sym, *_tick_tbk_ids) -# print(f"Destroying {tbk}..") -# await client.destroy(mk_tbk(tbk)) -# else: -# print("Nothing deleted.") -# -# tractor.run(main) - - -# @cli.command() -# @click.option( -# '--tsdb_host', -# default='localhost' -# ) -# @click.option( -# '--tsdb_port', -# default=5993 -# ) -# @click.argument('symbols', nargs=-1) -# @click.pass_obj -# def storesh( -# config, -# tl, -# host, -# port, -# symbols: list[str], -# ): -# ''' -# Start an IPython shell ready to query the local marketstore db. - -# ''' -# from piker.storage import open_tsdb_client -# from piker.service import open_piker_runtime - -# async def main(): -# nonlocal symbols - -# async with open_piker_runtime( -# 'storesh', -# enable_modules=['piker.service._ahab'], -# ): -# symbol = symbols[0] - -# async with open_tsdb_client(symbol): -# # TODO: ask if user wants to write history for detected -# # available shm buffers? -# from tractor.trionics import ipython_embed -# await ipython_embed() - -# trio.run(main) - @store.command() def ls( @@ -274,48 +159,3 @@ def delete( typer_click_object = typer.main.get_command(store) cli.add_command(typer_click_object, 'store') - -# @cli.command() -# @click.option('--test-file', '-t', help='Test quote stream file') -# @click.option('--tl', is_flag=True, help='Enable tractor logging') -# @click.argument('name', nargs=1, required=True) -# @click.pass_obj -# def ingest(config, name, test_file, tl): -# ''' -# Ingest real-time broker quotes and ticks to a marketstore instance. - -# ''' -# # global opts -# loglevel = config['loglevel'] -# tractorloglevel = config['tractorloglevel'] -# # log = config['log'] - -# watchlist_from_file = wl.ensure_watchlists(config['wl_path']) -# watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) -# symbols = watchlists[name] - -# grouped_syms = {} -# for sym in symbols: -# symbol, _, provider = sym.rpartition('.') -# if provider not in grouped_syms: -# grouped_syms[provider] = [] - -# grouped_syms[provider].append(symbol) - -# async def entry_point(): -# async with tractor.open_nursery() as n: -# for provider, symbols in grouped_syms.items(): -# await n.run_in_actor( -# ingest_quote_stream, -# name='ingest_marketstore', -# symbols=symbols, -# brokername=provider, -# tries=1, -# actorloglevel=loglevel, -# loglevel=tractorloglevel -# ) - -# tractor.run(entry_point) - -# if __name__ == "__main__": -# store() # this is called from ``>> ledger `` diff --git a/piker/storage/marketstore.py b/piker/storage/marketstore/__init__.py similarity index 100% rename from piker/storage/marketstore.py rename to piker/storage/marketstore/__init__.py diff --git a/piker/storage/marketstore/_ingest.py b/piker/storage/marketstore/_ingest.py new file mode 100644 index 00000000..7056399b --- /dev/null +++ b/piker/storage/marketstore/_ingest.py @@ -0,0 +1,177 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Legacy marketstore ingest and streaming related clis. + +''' +# from .. import watchlists as wl +# from ..service.marketstore import ( + # get_client, + # stream_quotes, + # ingest_quote_stream, + # _url, + # _tick_tbk_ids, + # mk_tbk, +# ) + +# @cli.command() +# @click.option( +# '--url', +# default='ws://localhost:5993/ws', +# help='HTTP URL of marketstore instance' +# ) +# @click.argument('names', nargs=-1) +# @click.pass_obj +# def ms_stream( +# config: dict, +# names: list[str], +# url: str, +# ) -> None: +# ''' +# Connect to a marketstore time bucket stream for (a set of) symbols(s) +# and print to console. + +# ''' +# async def main(): +# # async for quote in stream_quotes(symbols=names): +# # log.info(f"Received quote:\n{quote}") +# ... + +# trio.run(main) + + +# @cli.command() +# @click.option( +# '--url', +# default=_url, +# help='HTTP URL of marketstore instance' +# ) +# @click.argument('names', nargs=-1) +# @click.pass_obj +# def ms_destroy(config: dict, names: list[str], url: str) -> None: +# """Destroy symbol entries in the local marketstore instance. +# """ +# async def main(): +# nonlocal names +# async with get_client(url) as client: +# +# if not names: +# names = await client.list_symbols() +# +# # default is to wipe db entirely. +# answer = input( +# "This will entirely wipe you local marketstore db @ " +# f"{url} of the following symbols:\n {pformat(names)}" +# "\n\nDelete [N/y]?\n") +# +# if answer == 'y': +# for sym in names: +# # tbk = _tick_tbk.format(sym) +# tbk = tuple(sym, *_tick_tbk_ids) +# print(f"Destroying {tbk}..") +# await client.destroy(mk_tbk(tbk)) +# else: +# print("Nothing deleted.") +# +# tractor.run(main) + + +# @cli.command() +# @click.option( +# '--tsdb_host', +# default='localhost' +# ) +# @click.option( +# '--tsdb_port', +# default=5993 +# ) +# @click.argument('symbols', nargs=-1) +# @click.pass_obj +# def storesh( +# config, +# tl, +# host, +# port, +# symbols: list[str], +# ): +# ''' +# Start an IPython shell ready to query the local marketstore db. + +# ''' +# from piker.storage import open_tsdb_client +# from piker.service import open_piker_runtime + +# async def main(): +# nonlocal symbols + +# async with open_piker_runtime( +# 'storesh', +# enable_modules=['piker.service._ahab'], +# ): +# symbol = symbols[0] + +# async with open_tsdb_client(symbol): +# # TODO: ask if user wants to write history for detected +# # available shm buffers? +# from tractor.trionics import ipython_embed +# await ipython_embed() + +# trio.run(main) + + +# @cli.command() +# @click.option('--test-file', '-t', help='Test quote stream file') +# @click.option('--tl', is_flag=True, help='Enable tractor logging') +# @click.argument('name', nargs=1, required=True) +# @click.pass_obj +# def ingest(config, name, test_file, tl): +# ''' +# Ingest real-time broker quotes and ticks to a marketstore instance. + +# ''' +# # global opts +# loglevel = config['loglevel'] +# tractorloglevel = config['tractorloglevel'] +# # log = config['log'] + +# watchlist_from_file = wl.ensure_watchlists(config['wl_path']) +# watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) +# symbols = watchlists[name] + +# grouped_syms = {} +# for sym in symbols: +# symbol, _, provider = sym.rpartition('.') +# if provider not in grouped_syms: +# grouped_syms[provider] = [] + +# grouped_syms[provider].append(symbol) + +# async def entry_point(): +# async with tractor.open_nursery() as n: +# for provider, symbols in grouped_syms.items(): +# await n.run_in_actor( +# ingest_quote_stream, +# name='ingest_marketstore', +# symbols=symbols, +# brokername=provider, +# tries=1, +# actorloglevel=loglevel, +# loglevel=tractorloglevel +# ) + +# tractor.run(entry_point) +