Add latency measures around diffs/writes to mkts
parent
4555a1f279
commit
3dba456cf8
|
@ -119,14 +119,14 @@ def ms_shell(config, tl, host, port):
|
||||||
Start an IPython shell ready to query the local marketstore db.
|
Start an IPython shell ready to query the local marketstore db.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from piker.data.marketstore import backfill_history
|
from piker.data.marketstore import backfill_history_diff
|
||||||
from piker._daemon import open_piker_runtime
|
from piker._daemon import open_piker_runtime
|
||||||
async def main():
|
async def main():
|
||||||
async with open_piker_runtime(
|
async with open_piker_runtime(
|
||||||
'ms_shell',
|
'ms_shell',
|
||||||
enable_modules=['piker.data._ahab'],
|
enable_modules=['piker.data._ahab'],
|
||||||
):
|
):
|
||||||
await backfill_history()
|
await backfill_history_diff()
|
||||||
# TODO: write magics to query marketstore
|
# TODO: write magics to query marketstore
|
||||||
# from IPython import embed
|
# from IPython import embed
|
||||||
# embed()
|
# embed()
|
||||||
|
|
|
@ -113,7 +113,7 @@ def mk_tbk(keys: tuple[str, str, str]) -> str:
|
||||||
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
|
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return '{}/' + '/'.join(keys)
|
return '/'.join(keys)
|
||||||
|
|
||||||
|
|
||||||
def quote_to_marketstore_structarray(
|
def quote_to_marketstore_structarray(
|
||||||
|
@ -184,8 +184,8 @@ async def get_client(
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
|
||||||
# class MarketStoreError(Exception):
|
class MarketStoreError(Exception):
|
||||||
# "Generic marketstore client error"
|
"Generic marketstore client error"
|
||||||
|
|
||||||
|
|
||||||
# def err_on_resp(response: dict) -> None:
|
# def err_on_resp(response: dict) -> None:
|
||||||
|
@ -210,13 +210,16 @@ tf_in_1s = bidict({
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
# @acm
|
async def manage_history(
|
||||||
async def load_history(
|
fqsn: str,
|
||||||
symbol: Symbol,
|
|
||||||
period: int = 1, # in seconds
|
period: int = 1, # in seconds
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> dict[str, np.ndarray]:
|
||||||
|
'''
|
||||||
|
Load a series by key and deliver in ``numpy`` struct array
|
||||||
|
format.
|
||||||
|
|
||||||
|
'''
|
||||||
async with get_client() as client:
|
async with get_client() as client:
|
||||||
|
|
||||||
tfstr = tf_in_1s[period]
|
tfstr = tf_in_1s[period]
|
||||||
|
@ -225,16 +228,17 @@ async def load_history(
|
||||||
)
|
)
|
||||||
# Dig out `numpy` results map
|
# Dig out `numpy` results map
|
||||||
arrays = {}
|
arrays = {}
|
||||||
await tractor.breakpoint()
|
|
||||||
# for qr in [onem, fivem]:
|
# for qr in [onem, fivem]:
|
||||||
# for name, data_set in qr.by_symbols().items():
|
for name, data_set in result.by_symbols().items():
|
||||||
# arrays[(name, qr)] = data_set.array
|
arrays[(name, qr)] = data_set.array
|
||||||
|
|
||||||
|
await tractor.breakpoint()
|
||||||
# # TODO: backfiller loop
|
# # TODO: backfiller loop
|
||||||
# array = arrays[(fqsn, qr)]
|
# array = arrays[(fqsn, qr)]
|
||||||
|
return arrays
|
||||||
|
|
||||||
|
|
||||||
async def backfill_history(
|
async def backfill_history_diff(
|
||||||
# symbol: Symbol
|
# symbol: Symbol
|
||||||
|
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
|
@ -251,7 +255,6 @@ async def backfill_history(
|
||||||
|
|
||||||
fqsn = mk_fqsn(broker, symbol)
|
fqsn = mk_fqsn(broker, symbol)
|
||||||
|
|
||||||
print('yo')
|
|
||||||
async with (
|
async with (
|
||||||
get_client() as client,
|
get_client() as client,
|
||||||
maybe_open_feed(
|
maybe_open_feed(
|
||||||
|
@ -263,21 +266,52 @@ async def backfill_history(
|
||||||
|
|
||||||
) as (feed, stream),
|
) as (feed, stream),
|
||||||
):
|
):
|
||||||
print('yo')
|
|
||||||
ohlcv = feed.shm.array
|
|
||||||
mkts_dt = np.dtype(_ohlcv_dt)
|
|
||||||
|
|
||||||
print('yo')
|
|
||||||
syms = await client.list_symbols()
|
syms = await client.list_symbols()
|
||||||
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
||||||
|
|
||||||
# build mkts schema compat array
|
# diff db history with shm and only write the missing portions
|
||||||
|
ohlcv = feed.shm.array
|
||||||
|
|
||||||
|
key = (fqsn, '1Sec', 'OHLCV')
|
||||||
|
tbk = mk_tbk(key)
|
||||||
|
|
||||||
|
# diff vs. existing array and append new history
|
||||||
|
# TODO:
|
||||||
|
|
||||||
|
# TODO: should be no error?
|
||||||
|
# assert not resp.responses
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
|
||||||
|
qr = await client.query(
|
||||||
|
# Params(fqsn, '1Sec`', 'OHLCV',)
|
||||||
|
Params(*key),
|
||||||
|
)
|
||||||
|
# # Dig out `numpy` results map
|
||||||
|
arrays: dict[tuple[str, int], np.ndarray] = {}
|
||||||
|
for name, data_set in qr.by_symbols().items():
|
||||||
|
in_secs = tf_in_1s.inverse[data_set.timeframe]
|
||||||
|
arrays[(name, in_secs)] = data_set.array
|
||||||
|
|
||||||
|
s1 = arrays[(fqsn, 1)]
|
||||||
|
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
|
||||||
|
|
||||||
|
end_diff = time.time()
|
||||||
|
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Appending {to_append.size} datums to tsdb from shm\n'
|
||||||
|
f'Total diff time: {diff_ms} ms'
|
||||||
|
)
|
||||||
|
|
||||||
|
# build mkts schema compat array for writing
|
||||||
|
mkts_dt = np.dtype(_ohlcv_dt)
|
||||||
mkts_array = np.zeros(
|
mkts_array = np.zeros(
|
||||||
len(ohlcv),
|
len(to_append),
|
||||||
dtype=mkts_dt,
|
dtype=mkts_dt,
|
||||||
)
|
)
|
||||||
# copy from shm array
|
# copy from shm array
|
||||||
mkts_array[:] = ohlcv[[
|
mkts_array[:] = to_append[[
|
||||||
'time',
|
'time',
|
||||||
'open',
|
'open',
|
||||||
'high',
|
'high',
|
||||||
|
@ -286,39 +320,27 @@ async def backfill_history(
|
||||||
'volume',
|
'volume',
|
||||||
]]
|
]]
|
||||||
|
|
||||||
key = (fqsn, '1Sec', 'OHLCV')
|
|
||||||
tbk = mk_tbk(key)
|
|
||||||
|
|
||||||
# diff vs. existing array and append new history
|
|
||||||
# TODO:
|
|
||||||
|
|
||||||
# write to db
|
# write to db
|
||||||
resp = await client.write(
|
resp = await client.write(
|
||||||
mkts_array,
|
mkts_array,
|
||||||
tbk=tbk,
|
tbk=tbk,
|
||||||
# NOTE: will will append duplicates
|
# NOTE: will will append duplicates
|
||||||
# for the same timestamp-index.
|
# for the same timestamp-index.
|
||||||
# isvariablelength=True,
|
isvariablelength=True,
|
||||||
)
|
)
|
||||||
# TODO: should be no error?
|
end_write = time.time()
|
||||||
# assert not resp.responses
|
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
|
||||||
|
log.info(
|
||||||
# # Dig out `numpy` results map
|
f'Wrote {to_append.size} datums to tsdb\n'
|
||||||
qr = await client.query(
|
f'Total write time: {diff_ms} ms'
|
||||||
Params(fqsn, '1Min`', 'OHLCV',)
|
|
||||||
)
|
)
|
||||||
qr = await client.query(
|
for resp in resp.responses:
|
||||||
# Params(fqsn, '1Sec`', 'OHLCV',)
|
err = resp.error
|
||||||
Params(*key),
|
if err:
|
||||||
)
|
raise MarketStoreError(err)
|
||||||
arrays = {}
|
|
||||||
# for qr in [onem, fivem]:
|
|
||||||
for name, data_set in qr.by_symbols().items():
|
|
||||||
arrays[(name, qr)] = data_set.array
|
|
||||||
|
|
||||||
# TODO: backfiller loop
|
# TODO: backfiller loop
|
||||||
array = arrays[(fqsn, qr)]
|
# await tractor.breakpoint()
|
||||||
await tractor.breakpoint()
|
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
|
|
Loading…
Reference in New Issue