Add back fqsn passthrough and feed opening

incr_update_backup
Tyler Goodlet 2022-04-29 11:26:49 -04:00
parent 8e5f5b6be6
commit 79eff13e76
2 changed files with 14 additions and 10 deletions

View File

@ -148,7 +148,7 @@ def storesh(
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update()
await tsdb_history_update(symbol)
trio.run(main)

View File

@ -270,6 +270,7 @@ class Storage:
self,
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
) -> tuple[
MarketstoreClient,
@ -287,6 +288,7 @@ class Storage:
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
end=end,
# limit_from_start=True,
# TODO: figure the max limit here given the
@ -346,6 +348,7 @@ class Storage:
self,
fqsn: str,
ohlcv: np.ndarray,
append_and_duplicate: bool = True,
) -> None:
# build mkts schema compat array for writing
@ -373,7 +376,7 @@ class Storage:
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True,
isvariablelength=append_and_duplicate,
)
log.info(
@ -443,17 +446,17 @@ async def tsdb_history_update(
async with (
open_storage_client(fqsn) as storage,
# maybe_open_feed(
# [fqsn],
# start_stream=False,
maybe_open_feed(
[fqsn],
start_stream=False,
# ) as (feed, stream),
) as (feed, stream),
):
profiler(f'opened feed for {fqsn}')
# to_append = feed.shm.array
# to_prepend = None
to_append = feed.shm.array
to_prepend = None
if fqsn:
symbol = feed.symbols.get(fqsn)
@ -477,10 +480,11 @@ async def tsdb_history_update(
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
# for array in [to_append, to_prepend]:
# if array is None:
# continue
@ -490,7 +494,7 @@ async def tsdb_history_update(
# )
# await storage.write_ohlcv(fqsn, array)
profiler('Finished db writes')
# profiler('Finished db writes')
async def ingest_quote_stream(