Don't open a feed, write or read ohlc in for now
parent
3056bc3143
commit
b8b95f1081
|
@ -405,7 +405,7 @@ async def open_storage_client(
|
||||||
|
|
||||||
|
|
||||||
async def tsdb_history_update(
|
async def tsdb_history_update(
|
||||||
fqsn: str,
|
fqsn: Optional[str] = None,
|
||||||
|
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
|
|
||||||
|
@ -443,34 +443,28 @@ async def tsdb_history_update(
|
||||||
async with (
|
async with (
|
||||||
open_storage_client(fqsn) as storage,
|
open_storage_client(fqsn) as storage,
|
||||||
|
|
||||||
maybe_open_feed(
|
# maybe_open_feed(
|
||||||
[fqsn],
|
# [fqsn],
|
||||||
start_stream=False,
|
# start_stream=False,
|
||||||
|
|
||||||
) as (feed, stream),
|
# ) as (feed, stream),
|
||||||
):
|
):
|
||||||
profiler(f'opened feed for {fqsn}')
|
profiler(f'opened feed for {fqsn}')
|
||||||
|
|
||||||
|
|
||||||
|
# to_append = feed.shm.array
|
||||||
|
# to_prepend = None
|
||||||
|
|
||||||
|
if fqsn:
|
||||||
symbol = feed.symbols.get(fqsn)
|
symbol = feed.symbols.get(fqsn)
|
||||||
if symbol:
|
if symbol:
|
||||||
fqsn = symbol.front_fqsn()
|
fqsn = symbol.front_fqsn()
|
||||||
|
|
||||||
syms = await storage.client.list_symbols()
|
|
||||||
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
|
||||||
profiler(f'listed symbols {syms}')
|
|
||||||
|
|
||||||
# diff db history with shm and only write the missing portions
|
# diff db history with shm and only write the missing portions
|
||||||
ohlcv = feed.shm.array
|
ohlcv = feed.shm.array
|
||||||
|
|
||||||
# TODO: use pg profiler
|
# TODO: use pg profiler
|
||||||
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
|
|
||||||
to_append = feed.shm.array
|
|
||||||
to_prepend = None
|
|
||||||
|
|
||||||
from tractor.trionics import ipython_embed
|
|
||||||
await ipython_embed()
|
|
||||||
|
|
||||||
# hist diffing
|
# hist diffing
|
||||||
if tsdb_arrays:
|
if tsdb_arrays:
|
||||||
onesec = tsdb_arrays[1]
|
onesec = tsdb_arrays[1]
|
||||||
|
@ -479,14 +473,22 @@ async def tsdb_history_update(
|
||||||
|
|
||||||
profiler('Finished db arrays diffs')
|
profiler('Finished db arrays diffs')
|
||||||
|
|
||||||
for array in [to_append, to_prepend]:
|
syms = await storage.client.list_symbols()
|
||||||
if array is None:
|
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||||
continue
|
profiler(f'listed symbols {syms}')
|
||||||
|
|
||||||
log.info(
|
from tractor.trionics import ipython_embed
|
||||||
f'Writing datums {array.size} -> to tsdb from shm\n'
|
await ipython_embed()
|
||||||
)
|
|
||||||
await storage.write_ohlcv(fqsn, array)
|
|
||||||
|
# for array in [to_append, to_prepend]:
|
||||||
|
# if array is None:
|
||||||
|
# continue
|
||||||
|
|
||||||
|
# log.info(
|
||||||
|
# f'Writing datums {array.size} -> to tsdb from shm\n'
|
||||||
|
# )
|
||||||
|
# await storage.write_ohlcv(fqsn, array)
|
||||||
|
|
||||||
profiler('Finished db writes')
|
profiler('Finished db writes')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue