Add latency measures around diffs/writes to mkts

incr_update_backup
Tyler Goodlet 2022-02-28 07:41:37 -05:00
parent 7d628c4059
commit ae8170204f
2 changed files with 67 additions and 45 deletions

View File

@ -119,14 +119,14 @@ def ms_shell(config, tl, host, port):
Start an IPython shell ready to query the local marketstore db. Start an IPython shell ready to query the local marketstore db.
''' '''
from piker.data.marketstore import backfill_history from piker.data.marketstore import backfill_history_diff
from piker._daemon import open_piker_runtime from piker._daemon import open_piker_runtime
async def main(): async def main():
async with open_piker_runtime( async with open_piker_runtime(
'ms_shell', 'ms_shell',
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
await backfill_history() await backfill_history_diff()
# TODO: write magics to query marketstore # TODO: write magics to query marketstore
# from IPython import embed # from IPython import embed
# embed() # embed()

View File

@ -113,7 +113,7 @@ def mk_tbk(keys: tuple[str, str, str]) -> str:
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
''' '''
return '{}/' + '/'.join(keys) return '/'.join(keys)
def quote_to_marketstore_structarray( def quote_to_marketstore_structarray(
@ -184,8 +184,8 @@ async def get_client(
yield client yield client
# class MarketStoreError(Exception): class MarketStoreError(Exception):
# "Generic marketstore client error" "Generic marketstore client error"
# def err_on_resp(response: dict) -> None: # def err_on_resp(response: dict) -> None:
@ -210,13 +210,16 @@ tf_in_1s = bidict({
}) })
# @acm async def manage_history(
async def load_history( fqsn: str,
symbol: Symbol,
period: int = 1, # in seconds period: int = 1, # in seconds
) -> np.ndarray: ) -> dict[str, np.ndarray]:
'''
Load a series by key and deliver in ``numpy`` struct array
format.
'''
async with get_client() as client: async with get_client() as client:
tfstr = tf_in_1s[period] tfstr = tf_in_1s[period]
@ -225,16 +228,17 @@ async def load_history(
) )
# Dig out `numpy` results map # Dig out `numpy` results map
arrays = {} arrays = {}
await tractor.breakpoint()
# for qr in [onem, fivem]: # for qr in [onem, fivem]:
# for name, data_set in qr.by_symbols().items(): for name, data_set in result.by_symbols().items():
# arrays[(name, qr)] = data_set.array arrays[(name, qr)] = data_set.array
await tractor.breakpoint()
# # TODO: backfiller loop # # TODO: backfiller loop
# array = arrays[(fqsn, qr)] # array = arrays[(fqsn, qr)]
return arrays
async def backfill_history( async def backfill_history_diff(
# symbol: Symbol # symbol: Symbol
) -> list[str]: ) -> list[str]:
@ -251,7 +255,6 @@ async def backfill_history(
fqsn = mk_fqsn(broker, symbol) fqsn = mk_fqsn(broker, symbol)
print('yo')
async with ( async with (
get_client() as client, get_client() as client,
maybe_open_feed( maybe_open_feed(
@ -263,21 +266,52 @@ async def backfill_history(
) as (feed, stream), ) as (feed, stream),
): ):
print('yo')
ohlcv = feed.shm.array
mkts_dt = np.dtype(_ohlcv_dt)
print('yo')
syms = await client.list_symbols() syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}') log.info(f'Existing symbol set:\n{pformat(syms)}')
# build mkts schema compat array # diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
key = (fqsn, '1Sec', 'OHLCV')
tbk = mk_tbk(key)
# diff vs. existing array and append new history
# TODO:
# TODO: should be no error?
# assert not resp.responses
start = time.time()
qr = await client.query(
# Params(fqsn, '1Sec`', 'OHLCV',)
Params(*key),
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
s1 = arrays[(fqsn, 1)]
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
end_diff = time.time()
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
log.info(
f'Appending {to_append.size} datums to tsdb from shm\n'
f'Total diff time: {diff_ms} ms'
)
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros( mkts_array = np.zeros(
len(ohlcv), len(to_append),
dtype=mkts_dt, dtype=mkts_dt,
) )
# copy from shm array # copy from shm array
mkts_array[:] = ohlcv[[ mkts_array[:] = to_append[[
'time', 'time',
'open', 'open',
'high', 'high',
@ -286,39 +320,27 @@ async def backfill_history(
'volume', 'volume',
]] ]]
key = (fqsn, '1Sec', 'OHLCV')
tbk = mk_tbk(key)
# diff vs. existing array and append new history
# TODO:
# write to db # write to db
resp = await client.write( resp = await client.write(
mkts_array, mkts_array,
tbk=tbk, tbk=tbk,
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# isvariablelength=True, isvariablelength=True,
) )
# TODO: should be no error? end_write = time.time()
# assert not resp.responses diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info(
# # Dig out `numpy` results map f'Wrote {to_append.size} datums to tsdb\n'
qr = await client.query( f'Total write time: {diff_ms} ms'
Params(fqsn, '1Min`', 'OHLCV',)
) )
qr = await client.query( for resp in resp.responses:
# Params(fqsn, '1Sec`', 'OHLCV',) err = resp.error
Params(*key), if err:
) raise MarketStoreError(err)
arrays = {}
# for qr in [onem, fivem]:
for name, data_set in qr.by_symbols().items():
arrays[(name, qr)] = data_set.array
# TODO: backfiller loop # TODO: backfiller loop
array = arrays[(fqsn, qr)] # await tractor.breakpoint()
await tractor.breakpoint()
async def ingest_quote_stream( async def ingest_quote_stream(