piker/piker/data/cli.py

197 lines
5.0 KiB
Python

# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
marketstore cli.
"""
from functools import partial
from pprint import pformat
from anyio_marketstore import open_marketstore_client
import trio
import tractor
import click
import numpy as np
from .marketstore import (
get_client,
# stream_quotes,
ingest_quote_stream,
# _url,
_tick_tbk_ids,
mk_tbk,
)
from ..cli import cli
from .. import watchlists as wl
from ..log import get_logger
from ._sharedmem import (
maybe_open_shm_array,
)
from ._source import (
base_iohlc_dtype,
)
log = get_logger(__name__)
@cli.command()
@click.option(
'--url',
default='ws://localhost:5993/ws',
help='HTTP URL of marketstore instance'
)
@click.argument('names', nargs=-1)
@click.pass_obj
def ms_stream(
config: dict,
names: list[str],
url: str,
) -> None:
'''
Connect to a marketstore time bucket stream for (a set of) symbols(s)
and print to console.
'''
async def main():
# async for quote in stream_quotes(symbols=names):
# log.info(f"Received quote:\n{quote}")
...
trio.run(main)
# @cli.command()
# @click.option(
# '--url',
# default=_url,
# help='HTTP URL of marketstore instance'
# )
# @click.argument('names', nargs=-1)
# @click.pass_obj
# def ms_destroy(config: dict, names: list[str], url: str) -> None:
# """Destroy symbol entries in the local marketstore instance.
# """
# async def main():
# nonlocal names
# async with get_client(url) as client:
#
# if not names:
# names = await client.list_symbols()
#
# # default is to wipe db entirely.
# answer = input(
# "This will entirely wipe you local marketstore db @ "
# f"{url} of the following symbols:\n {pformat(names)}"
# "\n\nDelete [N/y]?\n")
#
# if answer == 'y':
# for sym in names:
# # tbk = _tick_tbk.format(sym)
# tbk = tuple(sym, *_tick_tbk_ids)
# print(f"Destroying {tbk}..")
# await client.destroy(mk_tbk(tbk))
# else:
# print("Nothing deleted.")
#
# tractor.run(main)
@cli.command()
@click.option(
'--tl',
is_flag=True,
help='Enable tractor logging')
@click.option(
'--host',
default='localhost'
)
@click.option(
'--port',
default=5993
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def storesh(
config,
tl,
host,
port,
symbols: list[str],
):
'''
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import tsdb_history_update
from piker._daemon import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'storesh',
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update(symbol)
trio.run(main)
@cli.command()
@click.option('--test-file', '-t', help='Test quote stream file')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl):
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']
watchlist_from_file = wl.ensure_watchlists(config['wl_path'])
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name]
grouped_syms = {}
for sym in symbols:
symbol, _, provider = sym.rpartition('.')
if provider not in grouped_syms:
grouped_syms[provider] = []
grouped_syms[provider].append(symbol)
async def entry_point():
async with tractor.open_nursery() as n:
for provider, symbols in grouped_syms.items():
await n.run_in_actor(
ingest_quote_stream,
name='ingest_marketstore',
symbols=symbols,
brokername=provider,
tries=1,
actorloglevel=loglevel,
loglevel=tractorloglevel
)
tractor.run(entry_point)