Don't open a feed, write or read ohlc in for now

incr_update_backup
Tyler Goodlet 2022-04-23 17:30:00 -04:00
parent 76287a7523
commit c18795e454
1 changed files with 33 additions and 31 deletions

View File

@ -405,7 +405,7 @@ async def open_storage_client(
async def tsdb_history_update( async def tsdb_history_update(
fqsn: str, fqsn: Optional[str] = None,
) -> list[str]: ) -> list[str]:
@ -443,50 +443,52 @@ async def tsdb_history_update(
async with ( async with (
open_storage_client(fqsn) as storage, open_storage_client(fqsn) as storage,
maybe_open_feed( # maybe_open_feed(
[fqsn], # [fqsn],
start_stream=False, # start_stream=False,
) as (feed, stream), # ) as (feed, stream),
): ):
profiler(f'opened feed for {fqsn}') profiler(f'opened feed for {fqsn}')
symbol = feed.symbols.get(fqsn)
if symbol: # to_append = feed.shm.array
fqsn = symbol.front_fqsn() # to_prepend = None
if fqsn:
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
profiler('Finished db arrays diffs')
syms = await storage.client.list_symbols() syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}') profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
to_append = feed.shm.array
to_prepend = None
from tractor.trionics import ipython_embed from tractor.trionics import ipython_embed
await ipython_embed() await ipython_embed()
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
profiler('Finished db arrays diffs') # for array in [to_append, to_prepend]:
# if array is None:
# continue
for array in [to_append, to_prepend]: # log.info(
if array is None: # f'Writing datums {array.size} -> to tsdb from shm\n'
continue # )
# await storage.write_ohlcv(fqsn, array)
log.info(
f'Writing datums {array.size} -> to tsdb from shm\n'
)
await storage.write_ohlcv(fqsn, array)
profiler('Finished db writes') profiler('Finished db writes')