Don't open a feed, write or read ohlc in for now

m4_corrections
Tyler Goodlet 2022-04-23 17:30:00 -04:00
parent 2250566e72
commit 0b7961bb09
1 changed files with 33 additions and 31 deletions

View File

@ -405,7 +405,7 @@ async def open_storage_client(
async def tsdb_history_update(
fqsn: str,
fqsn: Optional[str] = None,
) -> list[str]:
@ -443,50 +443,52 @@ async def tsdb_history_update(
async with (
open_storage_client(fqsn) as storage,
maybe_open_feed(
[fqsn],
start_stream=False,
# maybe_open_feed(
# [fqsn],
# start_stream=False,
) as (feed, stream),
# ) as (feed, stream),
):
profiler(f'opened feed for {fqsn}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
# to_append = feed.shm.array
# to_prepend = None
if fqsn:
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
profiler('Finished db arrays diffs')
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
to_append = feed.shm.array
to_prepend = None
from tractor.trionics import ipython_embed
await ipython_embed()
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
profiler('Finished db arrays diffs')
# for array in [to_append, to_prepend]:
# if array is None:
# continue
for array in [to_append, to_prepend]:
if array is None:
continue
log.info(
f'Writing datums {array.size} -> to tsdb from shm\n'
)
await storage.write_ohlcv(fqsn, array)
# log.info(
# f'Writing datums {array.size} -> to tsdb from shm\n'
# )
# await storage.write_ohlcv(fqsn, array)
profiler('Finished db writes')