Load any symbol-matching shm array if no `marketstored` found

marketstore
Tyler Goodlet 2022-03-09 21:07:48 -05:00
parent 97211a0a7e
commit 1c3457d829
1 changed files with 56 additions and 12 deletions

View File

@ -18,15 +18,14 @@
marketstore cli. marketstore cli.
""" """
from typing import List
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
from anyio_marketstore import open_marketstore_client
import trio import trio
import tractor import tractor
import click import click
import numpy as np
from anyio_marketstore import open_marketstore_client
from .marketstore import ( from .marketstore import (
get_client, get_client,
@ -39,6 +38,12 @@ from .marketstore import (
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_logger from ..log import get_logger
from ._sharedmem import (
maybe_open_shm_array,
)
from ._source import (
base_iohlc_dtype,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -52,10 +57,16 @@ log = get_logger(__name__)
) )
@click.argument('names', nargs=-1) @click.argument('names', nargs=-1)
@click.pass_obj @click.pass_obj
def ms_stream(config: dict, names: List[str], url: str): def ms_stream(
"""Connect to a marketstore time bucket stream for (a set of) symbols(s) config: dict,
names: list[str],
url: str,
) -> None:
'''
Connect to a marketstore time bucket stream for (a set of) symbols(s)
and print to console. and print to console.
"""
'''
async def main(): async def main():
# async for quote in stream_quotes(symbols=names): # async for quote in stream_quotes(symbols=names):
# log.info(f"Received quote:\n{quote}") # log.info(f"Received quote:\n{quote}")
@ -72,7 +83,7 @@ def ms_stream(config: dict, names: List[str], url: str):
# ) # )
# @click.argument('names', nargs=-1) # @click.argument('names', nargs=-1)
# @click.pass_obj # @click.pass_obj
# def ms_destroy(config: dict, names: List[str], url: str) -> None: # def ms_destroy(config: dict, names: list[str], url: str) -> None:
# """Destroy symbol entries in the local marketstore instance. # """Destroy symbol entries in the local marketstore instance.
# """ # """
# async def main(): # async def main():
@ -113,23 +124,56 @@ def ms_stream(config: dict, names: List[str], url: str):
'--port', '--port',
default=5993 default=5993
) )
@click.argument('symbols', nargs=-1)
@click.pass_obj @click.pass_obj
def ms_shell(config, tl, host, port): def ms_shell(
config,
tl,
host,
port,
symbols: list[str],
):
''' '''
Start an IPython shell ready to query the local marketstore db. Start an IPython shell ready to query the local marketstore db.
''' '''
from piker.data.marketstore import backfill_history_diff from piker.data.marketstore import backfill_history_diff
from piker._daemon import open_piker_runtime from piker._daemon import open_piker_runtime
async def main(): async def main():
nonlocal symbols
async with open_piker_runtime( async with open_piker_runtime(
'ms_shell', 'ms_shell',
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
try:
await backfill_history_diff() await backfill_history_diff()
except OSError:
# TODO: write magics to query marketstore # TODO: write magics to query marketstore
# from IPython import embed
# embed() sym = symbols[0]
symbol, _, broker = sym.rpartition('.')
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=sym,
dtype=base_iohlc_dtype,
)
# load anything found in shm
from numpy.lib.recfunctions import structured_to_unstructured
mxmn = structured_to_unstructured(
shm.array[['low', 'high']],
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
).flatten()
from piker.ui._compression import downsample
xd, yd = downsample(
y=mxmn,
x=np.arange(len(mxmn)),
bins=4,
)
await tractor.breakpoint()
trio.run(main) trio.run(main)