Load any symbol-matching shm array if no `marketstored` found
parent
39b4d2684a
commit
565573b609
|
@ -18,15 +18,14 @@
|
||||||
marketstore cli.
|
marketstore cli.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import List
|
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
|
||||||
|
from anyio_marketstore import open_marketstore_client
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
import click
|
import click
|
||||||
|
import numpy as np
|
||||||
from anyio_marketstore import open_marketstore_client
|
|
||||||
|
|
||||||
from .marketstore import (
|
from .marketstore import (
|
||||||
get_client,
|
get_client,
|
||||||
|
@ -39,6 +38,12 @@ from .marketstore import (
|
||||||
from ..cli import cli
|
from ..cli import cli
|
||||||
from .. import watchlists as wl
|
from .. import watchlists as wl
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
|
from ._sharedmem import (
|
||||||
|
maybe_open_shm_array,
|
||||||
|
)
|
||||||
|
from ._source import (
|
||||||
|
base_iohlc_dtype,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -52,10 +57,16 @@ log = get_logger(__name__)
|
||||||
)
|
)
|
||||||
@click.argument('names', nargs=-1)
|
@click.argument('names', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ms_stream(config: dict, names: List[str], url: str):
|
def ms_stream(
|
||||||
"""Connect to a marketstore time bucket stream for (a set of) symbols(s)
|
config: dict,
|
||||||
|
names: list[str],
|
||||||
|
url: str,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Connect to a marketstore time bucket stream for (a set of) symbols(s)
|
||||||
and print to console.
|
and print to console.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
# async for quote in stream_quotes(symbols=names):
|
# async for quote in stream_quotes(symbols=names):
|
||||||
# log.info(f"Received quote:\n{quote}")
|
# log.info(f"Received quote:\n{quote}")
|
||||||
|
@ -72,7 +83,7 @@ def ms_stream(config: dict, names: List[str], url: str):
|
||||||
# )
|
# )
|
||||||
# @click.argument('names', nargs=-1)
|
# @click.argument('names', nargs=-1)
|
||||||
# @click.pass_obj
|
# @click.pass_obj
|
||||||
# def ms_destroy(config: dict, names: List[str], url: str) -> None:
|
# def ms_destroy(config: dict, names: list[str], url: str) -> None:
|
||||||
# """Destroy symbol entries in the local marketstore instance.
|
# """Destroy symbol entries in the local marketstore instance.
|
||||||
# """
|
# """
|
||||||
# async def main():
|
# async def main():
|
||||||
|
@ -113,23 +124,56 @@ def ms_stream(config: dict, names: List[str], url: str):
|
||||||
'--port',
|
'--port',
|
||||||
default=5993
|
default=5993
|
||||||
)
|
)
|
||||||
|
@click.argument('symbols', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ms_shell(config, tl, host, port):
|
def ms_shell(
|
||||||
|
config,
|
||||||
|
tl,
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
symbols: list[str],
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Start an IPython shell ready to query the local marketstore db.
|
Start an IPython shell ready to query the local marketstore db.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from piker.data.marketstore import backfill_history_diff
|
from piker.data.marketstore import backfill_history_diff
|
||||||
from piker._daemon import open_piker_runtime
|
from piker._daemon import open_piker_runtime
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
nonlocal symbols
|
||||||
|
|
||||||
async with open_piker_runtime(
|
async with open_piker_runtime(
|
||||||
'ms_shell',
|
'ms_shell',
|
||||||
enable_modules=['piker.data._ahab'],
|
enable_modules=['piker.data._ahab'],
|
||||||
):
|
):
|
||||||
await backfill_history_diff()
|
try:
|
||||||
# TODO: write magics to query marketstore
|
await backfill_history_diff()
|
||||||
# from IPython import embed
|
except OSError:
|
||||||
# embed()
|
# TODO: write magics to query marketstore
|
||||||
|
|
||||||
|
sym = symbols[0]
|
||||||
|
symbol, _, broker = sym.rpartition('.')
|
||||||
|
# (maybe) allocate shm array for this broker/symbol which will
|
||||||
|
# be used for fast near-term history capture and processing.
|
||||||
|
shm, opened = maybe_open_shm_array(
|
||||||
|
key=sym,
|
||||||
|
dtype=base_iohlc_dtype,
|
||||||
|
)
|
||||||
|
# load anything found in shm
|
||||||
|
from numpy.lib.recfunctions import structured_to_unstructured
|
||||||
|
mxmn = structured_to_unstructured(
|
||||||
|
shm.array[['low', 'high']],
|
||||||
|
# dtype=[('mxmn', '<f8'), ('index', '<i8')],
|
||||||
|
).flatten()
|
||||||
|
|
||||||
|
from piker.ui._compression import downsample
|
||||||
|
xd, yd = downsample(
|
||||||
|
y=mxmn,
|
||||||
|
x=np.arange(len(mxmn)),
|
||||||
|
bins=4,
|
||||||
|
)
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue