From 79eff13e760339c28e1e19ae6cd1c1965c6251b0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 29 Apr 2022 11:26:49 -0400 Subject: [PATCH] Add back fqsn passthrough and feed opening --- piker/data/cli.py | 2 +- piker/data/marketstore.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index 90992201..554048a4 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -148,7 +148,7 @@ def storesh( enable_modules=['piker.data._ahab'], ): symbol = symbols[0] - await tsdb_history_update() + await tsdb_history_update(symbol) trio.run(main) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index ef4a9657..95fd80ee 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -270,6 +270,7 @@ class Storage: self, fqsn: str, timeframe: Optional[Union[int, str]] = None, + end: Optional[int] = None, ) -> tuple[ MarketstoreClient, @@ -287,6 +288,7 @@ class Storage: symbols=fqsn, timeframe=tfstr, attrgroup='OHLCV', + end=end, # limit_from_start=True, # TODO: figure the max limit here given the @@ -346,6 +348,7 @@ class Storage: self, fqsn: str, ohlcv: np.ndarray, + append_and_duplicate: bool = True, ) -> None: # build mkts schema compat array for writing @@ -373,7 +376,7 @@ class Storage: # NOTE: will will append duplicates # for the same timestamp-index. # TODO: pre deduplicate? - isvariablelength=True, + isvariablelength=append_and_duplicate, ) log.info( @@ -443,17 +446,17 @@ async def tsdb_history_update( async with ( open_storage_client(fqsn) as storage, - # maybe_open_feed( - # [fqsn], - # start_stream=False, + maybe_open_feed( + [fqsn], + start_stream=False, - # ) as (feed, stream), + ) as (feed, stream), ): profiler(f'opened feed for {fqsn}') - # to_append = feed.shm.array - # to_prepend = None + to_append = feed.shm.array + to_prepend = None if fqsn: symbol = feed.symbols.get(fqsn) @@ -477,10 +480,11 @@ async def tsdb_history_update( log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') profiler(f'listed symbols {syms}') + # TODO: ask if user wants to write history for detected + # available shm buffers? from tractor.trionics import ipython_embed await ipython_embed() - # for array in [to_append, to_prepend]: # if array is None: # continue @@ -490,7 +494,7 @@ async def tsdb_history_update( # ) # await storage.write_ohlcv(fqsn, array) - profiler('Finished db writes') + # profiler('Finished db writes') async def ingest_quote_stream(