Load any symbol-matching shm array if no `marketstored` found
							parent
							
								
									1eb62e7678
								
							
						
					
					
						commit
						58f6aa4308
					
				| 
						 | 
				
			
			@ -18,15 +18,14 @@
 | 
			
		|||
marketstore cli.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
from typing import List
 | 
			
		||||
from functools import partial
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
 | 
			
		||||
from anyio_marketstore import open_marketstore_client
 | 
			
		||||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
import click
 | 
			
		||||
 | 
			
		||||
from anyio_marketstore import open_marketstore_client
 | 
			
		||||
import numpy as np
 | 
			
		||||
 | 
			
		||||
from .marketstore import (
 | 
			
		||||
    get_client,
 | 
			
		||||
| 
						 | 
				
			
			@ -39,6 +38,12 @@ from .marketstore import (
 | 
			
		|||
from ..cli import cli
 | 
			
		||||
from .. import watchlists as wl
 | 
			
		||||
from ..log import get_logger
 | 
			
		||||
from ._sharedmem import (
 | 
			
		||||
    maybe_open_shm_array,
 | 
			
		||||
)
 | 
			
		||||
from ._source import (
 | 
			
		||||
    base_iohlc_dtype,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
| 
						 | 
				
			
			@ -52,10 +57,16 @@ log = get_logger(__name__)
 | 
			
		|||
)
 | 
			
		||||
@click.argument('names', nargs=-1)
 | 
			
		||||
@click.pass_obj
 | 
			
		||||
def ms_stream(config: dict, names: List[str], url: str):
 | 
			
		||||
    """Connect to a marketstore time bucket stream for (a set of) symbols(s)
 | 
			
		||||
def ms_stream(
 | 
			
		||||
    config: dict,
 | 
			
		||||
    names: list[str],
 | 
			
		||||
    url: str,
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Connect to a marketstore time bucket stream for (a set of) symbols(s)
 | 
			
		||||
    and print to console.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async def main():
 | 
			
		||||
        # async for quote in stream_quotes(symbols=names):
 | 
			
		||||
        #    log.info(f"Received quote:\n{quote}")
 | 
			
		||||
| 
						 | 
				
			
			@ -72,7 +83,7 @@ def ms_stream(config: dict, names: List[str], url: str):
 | 
			
		|||
# )
 | 
			
		||||
# @click.argument('names', nargs=-1)
 | 
			
		||||
# @click.pass_obj
 | 
			
		||||
# def ms_destroy(config: dict, names: List[str], url: str) -> None:
 | 
			
		||||
# def ms_destroy(config: dict, names: list[str], url: str) -> None:
 | 
			
		||||
#     """Destroy symbol entries in the local marketstore instance.
 | 
			
		||||
#     """
 | 
			
		||||
#     async def main():
 | 
			
		||||
| 
						 | 
				
			
			@ -113,23 +124,56 @@ def ms_stream(config: dict, names: List[str], url: str):
 | 
			
		|||
    '--port',
 | 
			
		||||
    default=5993
 | 
			
		||||
)
 | 
			
		||||
@click.argument('symbols', nargs=-1)
 | 
			
		||||
@click.pass_obj
 | 
			
		||||
def ms_shell(config, tl, host, port):
 | 
			
		||||
def ms_shell(
 | 
			
		||||
    config,
 | 
			
		||||
    tl,
 | 
			
		||||
    host,
 | 
			
		||||
    port,
 | 
			
		||||
    symbols: list[str],
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Start an IPython shell ready to query the local marketstore db.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    from piker.data.marketstore import backfill_history_diff
 | 
			
		||||
    from piker._daemon import open_piker_runtime
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        nonlocal symbols
 | 
			
		||||
 | 
			
		||||
        async with open_piker_runtime(
 | 
			
		||||
            'ms_shell',
 | 
			
		||||
            enable_modules=['piker.data._ahab'],
 | 
			
		||||
        ):
 | 
			
		||||
            try:
 | 
			
		||||
                await backfill_history_diff()
 | 
			
		||||
            except OSError:
 | 
			
		||||
                # TODO: write magics to query marketstore
 | 
			
		||||
            # from IPython import embed
 | 
			
		||||
            # embed()
 | 
			
		||||
 | 
			
		||||
                sym = symbols[0]
 | 
			
		||||
                symbol, _, broker = sym.rpartition('.')
 | 
			
		||||
                # (maybe) allocate shm array for this broker/symbol which will
 | 
			
		||||
                # be used for fast near-term history capture and processing.
 | 
			
		||||
                shm, opened = maybe_open_shm_array(
 | 
			
		||||
                    key=sym,
 | 
			
		||||
                    dtype=base_iohlc_dtype,
 | 
			
		||||
                )
 | 
			
		||||
                # load anything found in shm
 | 
			
		||||
                from numpy.lib.recfunctions import structured_to_unstructured
 | 
			
		||||
                mxmn = structured_to_unstructured(
 | 
			
		||||
                    shm.array[['low', 'high']],
 | 
			
		||||
                    # dtype=[('mxmn', '<f8'), ('index', '<i8')],
 | 
			
		||||
                ).flatten()
 | 
			
		||||
 | 
			
		||||
                from piker.ui._compression import downsample
 | 
			
		||||
                xd, yd = downsample(
 | 
			
		||||
                    y=mxmn,
 | 
			
		||||
                    x=np.arange(len(mxmn)),
 | 
			
		||||
                    bins=4,
 | 
			
		||||
                )
 | 
			
		||||
                await tractor.breakpoint()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue