Add latency measures around diffs/writes to mkts
parent
852f9e3e64
commit
3c6f412a5b
|
@ -119,14 +119,14 @@ def ms_shell(config, tl, host, port):
|
|||
Start an IPython shell ready to query the local marketstore db.
|
||||
|
||||
'''
|
||||
from piker.data.marketstore import backfill_history
|
||||
from piker.data.marketstore import backfill_history_diff
|
||||
from piker._daemon import open_piker_runtime
|
||||
async def main():
|
||||
async with open_piker_runtime(
|
||||
'ms_shell',
|
||||
enable_modules=['piker.data._ahab'],
|
||||
):
|
||||
await backfill_history()
|
||||
await backfill_history_diff()
|
||||
# TODO: write magics to query marketstore
|
||||
# from IPython import embed
|
||||
# embed()
|
||||
|
|
|
@ -113,7 +113,7 @@ def mk_tbk(keys: tuple[str, str, str]) -> str:
|
|||
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
|
||||
|
||||
'''
|
||||
return '{}/' + '/'.join(keys)
|
||||
return '/'.join(keys)
|
||||
|
||||
|
||||
def quote_to_marketstore_structarray(
|
||||
|
@ -184,8 +184,8 @@ async def get_client(
|
|||
yield client
|
||||
|
||||
|
||||
# class MarketStoreError(Exception):
|
||||
# "Generic marketstore client error"
|
||||
class MarketStoreError(Exception):
|
||||
"Generic marketstore client error"
|
||||
|
||||
|
||||
# def err_on_resp(response: dict) -> None:
|
||||
|
@ -210,13 +210,16 @@ tf_in_1s = bidict({
|
|||
})
|
||||
|
||||
|
||||
# @acm
|
||||
async def load_history(
|
||||
symbol: Symbol,
|
||||
async def manage_history(
|
||||
fqsn: str,
|
||||
period: int = 1, # in seconds
|
||||
|
||||
) -> np.ndarray:
|
||||
) -> dict[str, np.ndarray]:
|
||||
'''
|
||||
Load a series by key and deliver in ``numpy`` struct array
|
||||
format.
|
||||
|
||||
'''
|
||||
async with get_client() as client:
|
||||
|
||||
tfstr = tf_in_1s[period]
|
||||
|
@ -225,16 +228,17 @@ async def load_history(
|
|||
)
|
||||
# Dig out `numpy` results map
|
||||
arrays = {}
|
||||
await tractor.breakpoint()
|
||||
# for qr in [onem, fivem]:
|
||||
# for name, data_set in qr.by_symbols().items():
|
||||
# arrays[(name, qr)] = data_set.array
|
||||
for name, data_set in result.by_symbols().items():
|
||||
arrays[(name, qr)] = data_set.array
|
||||
|
||||
await tractor.breakpoint()
|
||||
# # TODO: backfiller loop
|
||||
# array = arrays[(fqsn, qr)]
|
||||
return arrays
|
||||
|
||||
|
||||
async def backfill_history(
|
||||
async def backfill_history_diff(
|
||||
# symbol: Symbol
|
||||
|
||||
) -> list[str]:
|
||||
|
@ -251,7 +255,6 @@ async def backfill_history(
|
|||
|
||||
fqsn = mk_fqsn(broker, symbol)
|
||||
|
||||
print('yo')
|
||||
async with (
|
||||
get_client() as client,
|
||||
maybe_open_feed(
|
||||
|
@ -263,21 +266,52 @@ async def backfill_history(
|
|||
|
||||
) as (feed, stream),
|
||||
):
|
||||
print('yo')
|
||||
ohlcv = feed.shm.array
|
||||
mkts_dt = np.dtype(_ohlcv_dt)
|
||||
|
||||
print('yo')
|
||||
syms = await client.list_symbols()
|
||||
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
||||
|
||||
# build mkts schema compat array
|
||||
# diff db history with shm and only write the missing portions
|
||||
ohlcv = feed.shm.array
|
||||
|
||||
key = (fqsn, '1Sec', 'OHLCV')
|
||||
tbk = mk_tbk(key)
|
||||
|
||||
# diff vs. existing array and append new history
|
||||
# TODO:
|
||||
|
||||
# TODO: should be no error?
|
||||
# assert not resp.responses
|
||||
|
||||
start = time.time()
|
||||
|
||||
qr = await client.query(
|
||||
# Params(fqsn, '1Sec`', 'OHLCV',)
|
||||
Params(*key),
|
||||
)
|
||||
# # Dig out `numpy` results map
|
||||
arrays: dict[tuple[str, int], np.ndarray] = {}
|
||||
for name, data_set in qr.by_symbols().items():
|
||||
in_secs = tf_in_1s.inverse[data_set.timeframe]
|
||||
arrays[(name, in_secs)] = data_set.array
|
||||
|
||||
s1 = arrays[(fqsn, 1)]
|
||||
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
|
||||
|
||||
end_diff = time.time()
|
||||
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
|
||||
|
||||
log.info(
|
||||
f'Appending {to_append.size} datums to tsdb from shm\n'
|
||||
f'Total diff time: {diff_ms} ms'
|
||||
)
|
||||
|
||||
# build mkts schema compat array for writing
|
||||
mkts_dt = np.dtype(_ohlcv_dt)
|
||||
mkts_array = np.zeros(
|
||||
len(ohlcv),
|
||||
len(to_append),
|
||||
dtype=mkts_dt,
|
||||
)
|
||||
# copy from shm array
|
||||
mkts_array[:] = ohlcv[[
|
||||
mkts_array[:] = to_append[[
|
||||
'time',
|
||||
'open',
|
||||
'high',
|
||||
|
@ -286,39 +320,27 @@ async def backfill_history(
|
|||
'volume',
|
||||
]]
|
||||
|
||||
key = (fqsn, '1Sec', 'OHLCV')
|
||||
tbk = mk_tbk(key)
|
||||
|
||||
# diff vs. existing array and append new history
|
||||
# TODO:
|
||||
|
||||
# write to db
|
||||
resp = await client.write(
|
||||
mkts_array,
|
||||
tbk=tbk,
|
||||
# NOTE: will will append duplicates
|
||||
# for the same timestamp-index.
|
||||
# isvariablelength=True,
|
||||
isvariablelength=True,
|
||||
)
|
||||
# TODO: should be no error?
|
||||
# assert not resp.responses
|
||||
|
||||
# # Dig out `numpy` results map
|
||||
qr = await client.query(
|
||||
Params(fqsn, '1Min`', 'OHLCV',)
|
||||
end_write = time.time()
|
||||
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
|
||||
log.info(
|
||||
f'Wrote {to_append.size} datums to tsdb\n'
|
||||
f'Total write time: {diff_ms} ms'
|
||||
)
|
||||
qr = await client.query(
|
||||
# Params(fqsn, '1Sec`', 'OHLCV',)
|
||||
Params(*key),
|
||||
)
|
||||
arrays = {}
|
||||
# for qr in [onem, fivem]:
|
||||
for name, data_set in qr.by_symbols().items():
|
||||
arrays[(name, qr)] = data_set.array
|
||||
for resp in resp.responses:
|
||||
err = resp.error
|
||||
if err:
|
||||
raise MarketStoreError(err)
|
||||
|
||||
# TODO: backfiller loop
|
||||
array = arrays[(fqsn, qr)]
|
||||
await tractor.breakpoint()
|
||||
# await tractor.breakpoint()
|
||||
|
||||
|
||||
async def ingest_quote_stream(
|
||||
|
|
Loading…
Reference in New Issue