diff --git a/piker/data/cli.py b/piker/data/cli.py index 6f4e169d..6984d9ff 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -18,34 +18,22 @@ marketstore cli. """ -from functools import partial -from pprint import ( - pformat, - pprint, -) - -from anyio_marketstore import open_marketstore_client import trio import tractor import click -import numpy as np from ..service.marketstore import ( - get_client, + # get_client, # stream_quotes, ingest_quote_stream, # _url, - _tick_tbk_ids, - mk_tbk, + # _tick_tbk_ids, + # mk_tbk, ) from ..cli import cli from .. import watchlists as wl -from ..log import get_logger -from ._sharedmem import ( - maybe_open_shm_array, -) -from ._source import ( - base_iohlc_dtype, +from ..log import ( + get_logger, ) @@ -92,16 +80,16 @@ def ms_stream( # async def main(): # nonlocal names # async with get_client(url) as client: -# +# # if not names: # names = await client.list_symbols() -# +# # # default is to wipe db entirely. # answer = input( # "This will entirely wipe you local marketstore db @ " # f"{url} of the following symbols:\n {pformat(names)}" # "\n\nDelete [N/y]?\n") -# +# # if answer == 'y': # for sym in names: # # tbk = _tick_tbk.format(sym) @@ -110,7 +98,7 @@ def ms_stream( # await client.destroy(mk_tbk(tbk)) # else: # print("Nothing deleted.") -# +# # tractor.run(main) @@ -148,7 +136,7 @@ def storesh( ): symbol = symbols[0] - async with open_tsdb_client(symbol) as storage: + async with open_tsdb_client(symbol): # TODO: ask if user wants to write history for detected # available shm buffers? from tractor.trionics import ipython_embed @@ -186,7 +174,7 @@ def storage( Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import open_tsdb_client + from piker.service.marketstore import open_tsdb_client from piker.service import open_piker_runtime async def main(): @@ -201,9 +189,28 @@ def storage( if delete: for fqsn in symbols: syms = await storage.client.list_symbols() - breakpoint() - await storage.delete_ts(fqsn, 60) - await storage.delete_ts(fqsn, 1) + + resp60s = await storage.delete_ts(fqsn, 60) + + msgish = resp60s.ListFields()[0][1] + if 'error' in str(msgish): + + # TODO: MEGA LOL, apparently the symbols don't + # flush out until you refresh something or other + # (maybe the WALFILE)... #lelandorlulzone, classic + # alpaca(Rtm) design here .. + # well, if we ever can make this work we + # probably want to dogsplain the real reason + # for the delete errurz..llululu + if fqsn not in syms: + log.error(f'Pair {fqsn} dne in DB') + + log.error(f'Deletion error: {fqsn}\n{msgish}') + + resp1s = await storage.delete_ts(fqsn, 1) + msgish = resp1s.ListFields()[0][1] + if 'error' in str(msgish): + log.error(f'Deletion error: {fqsn}\n{msgish}') trio.run(main) @@ -237,7 +244,7 @@ def ingest(config, name, test_file, tl): async def entry_point(): async with tractor.open_nursery() as n: - for provider, symbols in grouped_syms.items(): + for provider, symbols in grouped_syms.items(): await n.run_in_actor( ingest_quote_stream, name='ingest_marketstore',