Load any symbol-matching shm array if no `marketstored` found
							parent
							
								
									e33d0aac15
								
							
						
					
					
						commit
						c5be35dad4
					
				| 
						 | 
					@ -18,15 +18,14 @@
 | 
				
			||||||
marketstore cli.
 | 
					marketstore cli.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
from typing import List
 | 
					 | 
				
			||||||
from functools import partial
 | 
					from functools import partial
 | 
				
			||||||
from pprint import pformat
 | 
					from pprint import pformat
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from anyio_marketstore import open_marketstore_client
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
import click
 | 
					import click
 | 
				
			||||||
 | 
					import numpy as np
 | 
				
			||||||
from anyio_marketstore import open_marketstore_client
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
from .marketstore import (
 | 
					from .marketstore import (
 | 
				
			||||||
    get_client,
 | 
					    get_client,
 | 
				
			||||||
| 
						 | 
					@ -39,6 +38,12 @@ from .marketstore import (
 | 
				
			||||||
from ..cli import cli
 | 
					from ..cli import cli
 | 
				
			||||||
from .. import watchlists as wl
 | 
					from .. import watchlists as wl
 | 
				
			||||||
from ..log import get_logger
 | 
					from ..log import get_logger
 | 
				
			||||||
 | 
					from ._sharedmem import (
 | 
				
			||||||
 | 
					    maybe_open_shm_array,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					from ._source import (
 | 
				
			||||||
 | 
					    base_iohlc_dtype,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
log = get_logger(__name__)
 | 
					log = get_logger(__name__)
 | 
				
			||||||
| 
						 | 
					@ -52,10 +57,16 @@ log = get_logger(__name__)
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@click.argument('names', nargs=-1)
 | 
					@click.argument('names', nargs=-1)
 | 
				
			||||||
@click.pass_obj
 | 
					@click.pass_obj
 | 
				
			||||||
def ms_stream(config: dict, names: List[str], url: str):
 | 
					def ms_stream(
 | 
				
			||||||
    """Connect to a marketstore time bucket stream for (a set of) symbols(s)
 | 
					    config: dict,
 | 
				
			||||||
 | 
					    names: list[str],
 | 
				
			||||||
 | 
					    url: str,
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Connect to a marketstore time bucket stream for (a set of) symbols(s)
 | 
				
			||||||
    and print to console.
 | 
					    and print to console.
 | 
				
			||||||
    """
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
    async def main():
 | 
					    async def main():
 | 
				
			||||||
        # async for quote in stream_quotes(symbols=names):
 | 
					        # async for quote in stream_quotes(symbols=names):
 | 
				
			||||||
        #    log.info(f"Received quote:\n{quote}")
 | 
					        #    log.info(f"Received quote:\n{quote}")
 | 
				
			||||||
| 
						 | 
					@ -72,7 +83,7 @@ def ms_stream(config: dict, names: List[str], url: str):
 | 
				
			||||||
# )
 | 
					# )
 | 
				
			||||||
# @click.argument('names', nargs=-1)
 | 
					# @click.argument('names', nargs=-1)
 | 
				
			||||||
# @click.pass_obj
 | 
					# @click.pass_obj
 | 
				
			||||||
# def ms_destroy(config: dict, names: List[str], url: str) -> None:
 | 
					# def ms_destroy(config: dict, names: list[str], url: str) -> None:
 | 
				
			||||||
#     """Destroy symbol entries in the local marketstore instance.
 | 
					#     """Destroy symbol entries in the local marketstore instance.
 | 
				
			||||||
#     """
 | 
					#     """
 | 
				
			||||||
#     async def main():
 | 
					#     async def main():
 | 
				
			||||||
| 
						 | 
					@ -113,23 +124,56 @@ def ms_stream(config: dict, names: List[str], url: str):
 | 
				
			||||||
    '--port',
 | 
					    '--port',
 | 
				
			||||||
    default=5993
 | 
					    default=5993
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					@click.argument('symbols', nargs=-1)
 | 
				
			||||||
@click.pass_obj
 | 
					@click.pass_obj
 | 
				
			||||||
def ms_shell(config, tl, host, port):
 | 
					def ms_shell(
 | 
				
			||||||
 | 
					    config,
 | 
				
			||||||
 | 
					    tl,
 | 
				
			||||||
 | 
					    host,
 | 
				
			||||||
 | 
					    port,
 | 
				
			||||||
 | 
					    symbols: list[str],
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Start an IPython shell ready to query the local marketstore db.
 | 
					    Start an IPython shell ready to query the local marketstore db.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    from piker.data.marketstore import backfill_history_diff
 | 
					    from piker.data.marketstore import backfill_history_diff
 | 
				
			||||||
    from piker._daemon import open_piker_runtime
 | 
					    from piker._daemon import open_piker_runtime
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def main():
 | 
					    async def main():
 | 
				
			||||||
 | 
					        nonlocal symbols
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with open_piker_runtime(
 | 
					        async with open_piker_runtime(
 | 
				
			||||||
            'ms_shell',
 | 
					            'ms_shell',
 | 
				
			||||||
            enable_modules=['piker.data._ahab'],
 | 
					            enable_modules=['piker.data._ahab'],
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
                await backfill_history_diff()
 | 
					                await backfill_history_diff()
 | 
				
			||||||
 | 
					            except OSError:
 | 
				
			||||||
                # TODO: write magics to query marketstore
 | 
					                # TODO: write magics to query marketstore
 | 
				
			||||||
            # from IPython import embed
 | 
					
 | 
				
			||||||
            # embed()
 | 
					                sym = symbols[0]
 | 
				
			||||||
 | 
					                symbol, _, broker = sym.rpartition('.')
 | 
				
			||||||
 | 
					                # (maybe) allocate shm array for this broker/symbol which will
 | 
				
			||||||
 | 
					                # be used for fast near-term history capture and processing.
 | 
				
			||||||
 | 
					                shm, opened = maybe_open_shm_array(
 | 
				
			||||||
 | 
					                    key=sym,
 | 
				
			||||||
 | 
					                    dtype=base_iohlc_dtype,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                # load anything found in shm
 | 
				
			||||||
 | 
					                from numpy.lib.recfunctions import structured_to_unstructured
 | 
				
			||||||
 | 
					                mxmn = structured_to_unstructured(
 | 
				
			||||||
 | 
					                    shm.array[['low', 'high']],
 | 
				
			||||||
 | 
					                    # dtype=[('mxmn', '<f8'), ('index', '<i8')],
 | 
				
			||||||
 | 
					                ).flatten()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                from piker.ui._compression import downsample
 | 
				
			||||||
 | 
					                xd, yd = downsample(
 | 
				
			||||||
 | 
					                    y=mxmn,
 | 
				
			||||||
 | 
					                    x=np.arange(len(mxmn)),
 | 
				
			||||||
 | 
					                    bins=4,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                await tractor.breakpoint()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    trio.run(main)
 | 
					    trio.run(main)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue