Drop `ms-shell`, add `piker storesh` cmd
							parent
							
								
									a682887e63
								
							
						
					
					
						commit
						bb13f76375
					
				| 
						 | 
				
			
			@ -126,7 +126,7 @@ def ms_stream(
 | 
			
		|||
)
 | 
			
		||||
@click.argument('symbols', nargs=-1)
 | 
			
		||||
@click.pass_obj
 | 
			
		||||
def ms_shell(
 | 
			
		||||
def storesh(
 | 
			
		||||
    config,
 | 
			
		||||
    tl,
 | 
			
		||||
    host,
 | 
			
		||||
| 
						 | 
				
			
			@ -137,43 +137,18 @@ def ms_shell(
 | 
			
		|||
    Start an IPython shell ready to query the local marketstore db.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    from piker.data.marketstore import backfill_history_diff
 | 
			
		||||
    from piker.data.marketstore import tsdb_history_update
 | 
			
		||||
    from piker._daemon import open_piker_runtime
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        nonlocal symbols
 | 
			
		||||
 | 
			
		||||
        async with open_piker_runtime(
 | 
			
		||||
            'ms_shell',
 | 
			
		||||
            'storesh',
 | 
			
		||||
            enable_modules=['piker.data._ahab'],
 | 
			
		||||
        ):
 | 
			
		||||
            try:
 | 
			
		||||
                await backfill_history_diff()
 | 
			
		||||
            except OSError:
 | 
			
		||||
                # TODO: write magics to query marketstore
 | 
			
		||||
 | 
			
		||||
                sym = symbols[0]
 | 
			
		||||
                symbol, _, broker = sym.rpartition('.')
 | 
			
		||||
                # (maybe) allocate shm array for this broker/symbol which will
 | 
			
		||||
                # be used for fast near-term history capture and processing.
 | 
			
		||||
                shm, opened = maybe_open_shm_array(
 | 
			
		||||
                    key=sym,
 | 
			
		||||
                    dtype=base_iohlc_dtype,
 | 
			
		||||
                )
 | 
			
		||||
                # load anything found in shm
 | 
			
		||||
                from numpy.lib.recfunctions import structured_to_unstructured
 | 
			
		||||
                mxmn = structured_to_unstructured(
 | 
			
		||||
                    shm.array[['low', 'high']],
 | 
			
		||||
                    # dtype=[('mxmn', '<f8'), ('index', '<i8')],
 | 
			
		||||
                ).flatten()
 | 
			
		||||
 | 
			
		||||
                from piker.ui._compression import downsample
 | 
			
		||||
                xd, yd = downsample(
 | 
			
		||||
                    y=mxmn,
 | 
			
		||||
                    x=np.arange(len(mxmn)),
 | 
			
		||||
                    bins=4,
 | 
			
		||||
                )
 | 
			
		||||
                await tractor.breakpoint()
 | 
			
		||||
            symbol = symbols[0]
 | 
			
		||||
            await tsdb_history_update(symbol)
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -184,10 +159,11 @@ def ms_shell(
 | 
			
		|||
@click.argument('name', nargs=1, required=True)
 | 
			
		||||
@click.pass_obj
 | 
			
		||||
def ingest(config, name, test_file, tl):
 | 
			
		||||
    """Ingest real-time broker quotes and ticks to a marketstore instance.
 | 
			
		||||
    """
 | 
			
		||||
    '''
 | 
			
		||||
    Ingest real-time broker quotes and ticks to a marketstore instance.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # global opts
 | 
			
		||||
    brokermods = config['brokermods']
 | 
			
		||||
    loglevel = config['loglevel']
 | 
			
		||||
    tractorloglevel = config['tractorloglevel']
 | 
			
		||||
    # log = config['log']
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -444,6 +444,9 @@ async def tsdb_history_update(
 | 
			
		|||
                if err:
 | 
			
		||||
                    raise MarketStoreError(err)
 | 
			
		||||
 | 
			
		||||
        from tractor.trionics import ipython_embed
 | 
			
		||||
        await ipython_embed()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def ingest_quote_stream(
 | 
			
		||||
    symbols: list[str],
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue