Add `Storage.load()` and `.write_ohlcv()`
parent
bcf3be1fe4
commit
46c23e90db
|
@ -24,6 +24,7 @@
|
|||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from datetime import datetime
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
|
@ -45,6 +46,7 @@ from anyio_marketstore import (
|
|||
MarketstoreClient,
|
||||
Params,
|
||||
)
|
||||
import pendulum
|
||||
import purerpc
|
||||
|
||||
from .feed import maybe_open_feed
|
||||
|
@ -240,8 +242,29 @@ class Storage:
|
|||
async def write_ticks(self, ticks: list) -> None:
|
||||
...
|
||||
|
||||
async def write_ohlcv(self, ohlcv: np.ndarray) -> None:
|
||||
...
|
||||
async def load(
|
||||
self,
|
||||
fqsn: str,
|
||||
|
||||
) -> tuple[
|
||||
dict[int, np.ndarray], # timeframe (in secs) to series
|
||||
Optional[datetime], # first dt
|
||||
Optional[datetime], # last dt
|
||||
]:
|
||||
|
||||
first_tsdb_dt, last_tsdb_dt = None, None
|
||||
tsdb_arrays = await self.read_ohlcv(fqsn)
|
||||
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
||||
|
||||
if tsdb_arrays:
|
||||
fastest = list(tsdb_arrays.values())[0]
|
||||
times = fastest['Epoch']
|
||||
first, last = times[0], times[-1]
|
||||
first_tsdb_dt, last_tsdb_dt = map(
|
||||
pendulum.from_timestamp, [first, last]
|
||||
)
|
||||
|
||||
return tsdb_arrays, first_tsdb_dt, last_tsdb_dt
|
||||
|
||||
async def read_ohlcv(
|
||||
self,
|
||||
|
@ -319,6 +342,49 @@ class Storage:
|
|||
|
||||
return await client.destroy(tbk=key)
|
||||
|
||||
async def write_ohlcv(
|
||||
self,
|
||||
fqsn: str,
|
||||
ohlcv: np.ndarray,
|
||||
|
||||
) -> None:
|
||||
# build mkts schema compat array for writing
|
||||
mkts_dt = np.dtype(_ohlcv_dt)
|
||||
mkts_array = np.zeros(
|
||||
len(ohlcv),
|
||||
dtype=mkts_dt,
|
||||
)
|
||||
# copy from shm array (yes it's this easy):
|
||||
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
|
||||
mkts_array[:] = ohlcv[[
|
||||
'time',
|
||||
'open',
|
||||
'high',
|
||||
'low',
|
||||
'close',
|
||||
'volume',
|
||||
]]
|
||||
|
||||
# write to db
|
||||
resp = await self.client.write(
|
||||
mkts_array,
|
||||
tbk=f'{fqsn}/1Sec/OHLCV',
|
||||
|
||||
# NOTE: will will append duplicates
|
||||
# for the same timestamp-index.
|
||||
# TODO: pre deduplicate?
|
||||
isvariablelength=True,
|
||||
)
|
||||
|
||||
log.info(
|
||||
f'Wrote {mkts_array.size} datums to tsdb\n'
|
||||
)
|
||||
|
||||
for resp in resp.responses:
|
||||
err = resp.error
|
||||
if err:
|
||||
raise MarketStoreError(err)
|
||||
|
||||
|
||||
@acm
|
||||
async def open_storage_client(
|
||||
|
@ -402,6 +468,9 @@ async def tsdb_history_update(
|
|||
to_append = feed.shm.array
|
||||
to_prepend = None
|
||||
|
||||
from tractor.trionics import ipython_embed
|
||||
await ipython_embed()
|
||||
|
||||
# hist diffing
|
||||
if tsdb_arrays:
|
||||
onesec = tsdb_arrays[1]
|
||||
|
@ -417,47 +486,9 @@ async def tsdb_history_update(
|
|||
log.info(
|
||||
f'Writing datums {array.size} -> to tsdb from shm\n'
|
||||
)
|
||||
await storage.write_ohlcv(fqsn, array)
|
||||
|
||||
# build mkts schema compat array for writing
|
||||
mkts_dt = np.dtype(_ohlcv_dt)
|
||||
mkts_array = np.zeros(
|
||||
len(array),
|
||||
dtype=mkts_dt,
|
||||
)
|
||||
# copy from shm array (yes it's this easy):
|
||||
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
|
||||
mkts_array[:] = array[[
|
||||
'time',
|
||||
'open',
|
||||
'high',
|
||||
'low',
|
||||
'close',
|
||||
'volume',
|
||||
]]
|
||||
|
||||
# write to db
|
||||
resp = await storage.client.write(
|
||||
mkts_array,
|
||||
tbk=f'{fqsn}/1Sec/OHLCV',
|
||||
|
||||
# NOTE: will will append duplicates
|
||||
# for the same timestamp-index.
|
||||
# TODO: pre deduplicate?
|
||||
isvariablelength=True,
|
||||
)
|
||||
|
||||
log.info(
|
||||
f'Wrote {to_append.size} datums to tsdb\n'
|
||||
)
|
||||
profiler('Finished db writes')
|
||||
|
||||
for resp in resp.responses:
|
||||
err = resp.error
|
||||
if err:
|
||||
raise MarketStoreError(err)
|
||||
|
||||
from tractor.trionics import ipython_embed
|
||||
await ipython_embed()
|
||||
profiler('Finished db writes')
|
||||
|
||||
|
||||
async def ingest_quote_stream(
|
||||
|
|
Loading…
Reference in New Issue