Attempt to report `piker storage -d <fqsn>` errors

Not really sure there's much we can do besides dump Grpc stuff when we
detect an "error" `str` for the moment..

Either way leave a buncha complaints (como siempre) and do linting
fixups..
service_subpkg
Tyler Goodlet 2023-03-08 20:06:27 -05:00
parent cec2967071
commit 441243f83b
1 changed files with 34 additions and 27 deletions

View File

@ -18,34 +18,22 @@
marketstore cli. marketstore cli.
""" """
from functools import partial
from pprint import (
pformat,
pprint,
)
from anyio_marketstore import open_marketstore_client
import trio import trio
import tractor import tractor
import click import click
import numpy as np
from ..service.marketstore import ( from ..service.marketstore import (
get_client, # get_client,
# stream_quotes, # stream_quotes,
ingest_quote_stream, ingest_quote_stream,
# _url, # _url,
_tick_tbk_ids, # _tick_tbk_ids,
mk_tbk, # mk_tbk,
) )
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_logger from ..log import (
from ._sharedmem import ( get_logger,
maybe_open_shm_array,
)
from ._source import (
base_iohlc_dtype,
) )
@ -92,16 +80,16 @@ def ms_stream(
# async def main(): # async def main():
# nonlocal names # nonlocal names
# async with get_client(url) as client: # async with get_client(url) as client:
# #
# if not names: # if not names:
# names = await client.list_symbols() # names = await client.list_symbols()
# #
# # default is to wipe db entirely. # # default is to wipe db entirely.
# answer = input( # answer = input(
# "This will entirely wipe you local marketstore db @ " # "This will entirely wipe you local marketstore db @ "
# f"{url} of the following symbols:\n {pformat(names)}" # f"{url} of the following symbols:\n {pformat(names)}"
# "\n\nDelete [N/y]?\n") # "\n\nDelete [N/y]?\n")
# #
# if answer == 'y': # if answer == 'y':
# for sym in names: # for sym in names:
# # tbk = _tick_tbk.format(sym) # # tbk = _tick_tbk.format(sym)
@ -110,7 +98,7 @@ def ms_stream(
# await client.destroy(mk_tbk(tbk)) # await client.destroy(mk_tbk(tbk))
# else: # else:
# print("Nothing deleted.") # print("Nothing deleted.")
# #
# tractor.run(main) # tractor.run(main)
@ -148,7 +136,7 @@ def storesh(
): ):
symbol = symbols[0] symbol = symbols[0]
async with open_tsdb_client(symbol) as storage: async with open_tsdb_client(symbol):
# TODO: ask if user wants to write history for detected # TODO: ask if user wants to write history for detected
# available shm buffers? # available shm buffers?
from tractor.trionics import ipython_embed from tractor.trionics import ipython_embed
@ -186,7 +174,7 @@ def storage(
Start an IPython shell ready to query the local marketstore db. Start an IPython shell ready to query the local marketstore db.
''' '''
from piker.data.marketstore import open_tsdb_client from piker.service.marketstore import open_tsdb_client
from piker.service import open_piker_runtime from piker.service import open_piker_runtime
async def main(): async def main():
@ -201,9 +189,28 @@ def storage(
if delete: if delete:
for fqsn in symbols: for fqsn in symbols:
syms = await storage.client.list_symbols() syms = await storage.client.list_symbols()
breakpoint()
await storage.delete_ts(fqsn, 60) resp60s = await storage.delete_ts(fqsn, 60)
await storage.delete_ts(fqsn, 1)
msgish = resp60s.ListFields()[0][1]
if 'error' in str(msgish):
# TODO: MEGA LOL, apparently the symbols don't
# flush out until you refresh something or other
# (maybe the WALFILE)... #lelandorlulzone, classic
# alpaca(Rtm) design here ..
# well, if we ever can make this work we
# probably want to dogsplain the real reason
# for the delete errurz..llululu
if fqsn not in syms:
log.error(f'Pair {fqsn} dne in DB')
log.error(f'Deletion error: {fqsn}\n{msgish}')
resp1s = await storage.delete_ts(fqsn, 1)
msgish = resp1s.ListFields()[0][1]
if 'error' in str(msgish):
log.error(f'Deletion error: {fqsn}\n{msgish}')
trio.run(main) trio.run(main)
@ -237,7 +244,7 @@ def ingest(config, name, test_file, tl):
async def entry_point(): async def entry_point():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
for provider, symbols in grouped_syms.items(): for provider, symbols in grouped_syms.items():
await n.run_in_actor( await n.run_in_actor(
ingest_quote_stream, ingest_quote_stream,
name='ingest_marketstore', name='ingest_marketstore',