From c18795e454030555eb09240de4dc8a3bb3b6cbde Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 23 Apr 2022 17:30:00 -0400 Subject: [PATCH] Don't open a feed, write or read ohlc in for now --- piker/data/marketstore.py | 64 ++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 50804031..ef4a9657 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -405,7 +405,7 @@ async def open_storage_client( async def tsdb_history_update( - fqsn: str, + fqsn: Optional[str] = None, ) -> list[str]: @@ -443,50 +443,52 @@ async def tsdb_history_update( async with ( open_storage_client(fqsn) as storage, - maybe_open_feed( - [fqsn], - start_stream=False, + # maybe_open_feed( + # [fqsn], + # start_stream=False, - ) as (feed, stream), + # ) as (feed, stream), ): profiler(f'opened feed for {fqsn}') - symbol = feed.symbols.get(fqsn) - if symbol: - fqsn = symbol.front_fqsn() + + # to_append = feed.shm.array + # to_prepend = None + + if fqsn: + symbol = feed.symbols.get(fqsn) + if symbol: + fqsn = symbol.front_fqsn() + + # diff db history with shm and only write the missing portions + ohlcv = feed.shm.array + + # TODO: use pg profiler + tsdb_arrays = await storage.read_ohlcv(fqsn) + # hist diffing + if tsdb_arrays: + onesec = tsdb_arrays[1] + to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]] + to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]] + + profiler('Finished db arrays diffs') syms = await storage.client.list_symbols() log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') profiler(f'listed symbols {syms}') - # diff db history with shm and only write the missing portions - ohlcv = feed.shm.array - - # TODO: use pg profiler - tsdb_arrays = await storage.read_ohlcv(fqsn) - - to_append = feed.shm.array - to_prepend = None - from tractor.trionics import ipython_embed await ipython_embed() - # hist diffing - if tsdb_arrays: - onesec = tsdb_arrays[1] - to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]] - to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]] - profiler('Finished db arrays diffs') + # for array in [to_append, to_prepend]: + # if array is None: + # continue - for array in [to_append, to_prepend]: - if array is None: - continue - - log.info( - f'Writing datums {array.size} -> to tsdb from shm\n' - ) - await storage.write_ohlcv(fqsn, array) + # log.info( + # f'Writing datums {array.size} -> to tsdb from shm\n' + # ) + # await storage.write_ohlcv(fqsn, array) profiler('Finished db writes')