Add back fqsn passthrough and feed opening

l1_precision_fix
Tyler Goodlet 2022-04-29 11:26:49 -04:00
parent 49509d55d2
commit d77cfa3587
2 changed files with 14 additions and 10 deletions

View File

@ -148,7 +148,7 @@ def storesh(
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
symbol = symbols[0] symbol = symbols[0]
await tsdb_history_update() await tsdb_history_update(symbol)
trio.run(main) trio.run(main)

View File

@ -270,6 +270,7 @@ class Storage:
self, self,
fqsn: str, fqsn: str,
timeframe: Optional[Union[int, str]] = None, timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
) -> tuple[ ) -> tuple[
MarketstoreClient, MarketstoreClient,
@ -287,6 +288,7 @@ class Storage:
symbols=fqsn, symbols=fqsn,
timeframe=tfstr, timeframe=tfstr,
attrgroup='OHLCV', attrgroup='OHLCV',
end=end,
# limit_from_start=True, # limit_from_start=True,
# TODO: figure the max limit here given the # TODO: figure the max limit here given the
@ -346,6 +348,7 @@ class Storage:
self, self,
fqsn: str, fqsn: str,
ohlcv: np.ndarray, ohlcv: np.ndarray,
append_and_duplicate: bool = True,
) -> None: ) -> None:
# build mkts schema compat array for writing # build mkts schema compat array for writing
@ -373,7 +376,7 @@ class Storage:
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# TODO: pre deduplicate? # TODO: pre deduplicate?
isvariablelength=True, isvariablelength=append_and_duplicate,
) )
log.info( log.info(
@ -443,17 +446,17 @@ async def tsdb_history_update(
async with ( async with (
open_storage_client(fqsn) as storage, open_storage_client(fqsn) as storage,
# maybe_open_feed( maybe_open_feed(
# [fqsn], [fqsn],
# start_stream=False, start_stream=False,
# ) as (feed, stream), ) as (feed, stream),
): ):
profiler(f'opened feed for {fqsn}') profiler(f'opened feed for {fqsn}')
# to_append = feed.shm.array to_append = feed.shm.array
# to_prepend = None to_prepend = None
if fqsn: if fqsn:
symbol = feed.symbols.get(fqsn) symbol = feed.symbols.get(fqsn)
@ -477,10 +480,11 @@ async def tsdb_history_update(
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}') profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed from tractor.trionics import ipython_embed
await ipython_embed() await ipython_embed()
# for array in [to_append, to_prepend]: # for array in [to_append, to_prepend]:
# if array is None: # if array is None:
# continue # continue
@ -490,7 +494,7 @@ async def tsdb_history_update(
# ) # )
# await storage.write_ohlcv(fqsn, array) # await storage.write_ohlcv(fqsn, array)
profiler('Finished db writes') # profiler('Finished db writes')
async def ingest_quote_stream( async def ingest_quote_stream(