diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 50804031..ef4a9657 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -405,7 +405,7 @@ async def open_storage_client( async def tsdb_history_update( - fqsn: str, + fqsn: Optional[str] = None, ) -> list[str]: @@ -443,50 +443,52 @@ async def tsdb_history_update( async with ( open_storage_client(fqsn) as storage, - maybe_open_feed( - [fqsn], - start_stream=False, + # maybe_open_feed( + # [fqsn], + # start_stream=False, - ) as (feed, stream), + # ) as (feed, stream), ): profiler(f'opened feed for {fqsn}') - symbol = feed.symbols.get(fqsn) - if symbol: - fqsn = symbol.front_fqsn() + + # to_append = feed.shm.array + # to_prepend = None + + if fqsn: + symbol = feed.symbols.get(fqsn) + if symbol: + fqsn = symbol.front_fqsn() + + # diff db history with shm and only write the missing portions + ohlcv = feed.shm.array + + # TODO: use pg profiler + tsdb_arrays = await storage.read_ohlcv(fqsn) + # hist diffing + if tsdb_arrays: + onesec = tsdb_arrays[1] + to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]] + to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]] + + profiler('Finished db arrays diffs') syms = await storage.client.list_symbols() log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') profiler(f'listed symbols {syms}') - # diff db history with shm and only write the missing portions - ohlcv = feed.shm.array - - # TODO: use pg profiler - tsdb_arrays = await storage.read_ohlcv(fqsn) - - to_append = feed.shm.array - to_prepend = None - from tractor.trionics import ipython_embed await ipython_embed() - # hist diffing - if tsdb_arrays: - onesec = tsdb_arrays[1] - to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]] - to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]] - profiler('Finished db arrays diffs') + # for array in [to_append, to_prepend]: + # if array is None: + # continue - for array in [to_append, to_prepend]: - if array is None: - continue - - log.info( - f'Writing datums {array.size} -> to tsdb from shm\n' - ) - await storage.write_ohlcv(fqsn, array) + # log.info( + # f'Writing datums {array.size} -> to tsdb from shm\n' + # ) + # await storage.write_ohlcv(fqsn, array) profiler('Finished db writes')