diff --git a/piker/data/cli.py b/piker/data/cli.py index 6ea2503d..29ccf2cf 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -18,15 +18,14 @@ marketstore cli. """ -from typing import List from functools import partial from pprint import pformat +from anyio_marketstore import open_marketstore_client import trio import tractor import click - -from anyio_marketstore import open_marketstore_client +import numpy as np from .marketstore import ( get_client, @@ -39,6 +38,12 @@ from .marketstore import ( from ..cli import cli from .. import watchlists as wl from ..log import get_logger +from ._sharedmem import ( + maybe_open_shm_array, +) +from ._source import ( + base_iohlc_dtype, +) log = get_logger(__name__) @@ -52,10 +57,16 @@ log = get_logger(__name__) ) @click.argument('names', nargs=-1) @click.pass_obj -def ms_stream(config: dict, names: List[str], url: str): - """Connect to a marketstore time bucket stream for (a set of) symbols(s) +def ms_stream( + config: dict, + names: list[str], + url: str, +) -> None: + ''' + Connect to a marketstore time bucket stream for (a set of) symbols(s) and print to console. - """ + + ''' async def main(): # async for quote in stream_quotes(symbols=names): # log.info(f"Received quote:\n{quote}") @@ -72,7 +83,7 @@ def ms_stream(config: dict, names: List[str], url: str): # ) # @click.argument('names', nargs=-1) # @click.pass_obj -# def ms_destroy(config: dict, names: List[str], url: str) -> None: +# def ms_destroy(config: dict, names: list[str], url: str) -> None: # """Destroy symbol entries in the local marketstore instance. # """ # async def main(): @@ -113,23 +124,56 @@ def ms_stream(config: dict, names: List[str], url: str): '--port', default=5993 ) +@click.argument('symbols', nargs=-1) @click.pass_obj -def ms_shell(config, tl, host, port): +def ms_shell( + config, + tl, + host, + port, + symbols: list[str], +): ''' Start an IPython shell ready to query the local marketstore db. ''' from piker.data.marketstore import backfill_history_diff from piker._daemon import open_piker_runtime + async def main(): + nonlocal symbols + async with open_piker_runtime( 'ms_shell', enable_modules=['piker.data._ahab'], ): - await backfill_history_diff() - # TODO: write magics to query marketstore - # from IPython import embed - # embed() + try: + await backfill_history_diff() + except OSError: + # TODO: write magics to query marketstore + + sym = symbols[0] + symbol, _, broker = sym.rpartition('.') + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + shm, opened = maybe_open_shm_array( + key=sym, + dtype=base_iohlc_dtype, + ) + # load anything found in shm + from numpy.lib.recfunctions import structured_to_unstructured + mxmn = structured_to_unstructured( + shm.array[['low', 'high']], + # dtype=[('mxmn', '