`fqme` adjustments to marketstore module

Mostly renaming from the old acronym. This also contains necessary
conf.toml loading in order to call `open_storage_client()` which now
does not have default network contact info.
master
Tyler Goodlet 2023-05-17 10:46:32 -04:00
parent ae049eb84f
commit 3294defee1
1 changed files with 37 additions and 27 deletions

View File

@ -59,6 +59,7 @@ from ._util import (
)
from ..data.feed import maybe_open_feed
from .._profile import Profiler
from .. import config
# ahabd-supervisor and container level config
@ -332,8 +333,8 @@ def quote_to_marketstore_structarray(
@acm
async def get_client(
host: str = 'localhost',
port: int = _config['grpc_listen_port'],
host: str | None,
port: int | None,
) -> MarketstoreClient:
'''
@ -342,8 +343,8 @@ async def get_client(
'''
async with open_marketstore_client(
host,
port
host or 'localhost',
port or _config['grpc_listen_port'],
) as client:
yield client
@ -407,7 +408,7 @@ class Storage:
async def load(
self,
fqsn: str,
fqme: str,
timeframe: int,
) -> tuple[
@ -418,7 +419,7 @@ class Storage:
first_tsdb_dt, last_tsdb_dt = None, None
hist = await self.read_ohlcv(
fqsn,
fqme,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
@ -441,7 +442,7 @@ class Storage:
async def read_ohlcv(
self,
fqsn: str,
fqme: str,
timeframe: int | str,
end: int | None = None,
limit: int = int(800e3),
@ -451,14 +452,14 @@ class Storage:
client = self.client
syms = await client.list_symbols()
if fqsn not in syms:
if fqme not in syms:
return {}
# use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
params = Params(
symbols=fqsn,
symbols=fqme,
timeframe=tfstr,
attrgroup='OHLCV',
end=end,
@ -488,7 +489,7 @@ class Storage:
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
data_set = result.by_symbols()[fqsn]
data_set = result.by_symbols()[fqme]
array = data_set.array
# XXX: ensure sample rate is as expected
@ -503,11 +504,11 @@ class Storage:
'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG'
f'WIPING HISTORY FOR {ts}s'
)
await self.delete_ts(fqsn, timeframe)
await self.delete_ts(fqme, timeframe)
# try reading again..
return await self.read_ohlcv(
fqsn,
fqme,
timeframe,
end,
limit,
@ -537,7 +538,7 @@ class Storage:
async def write_ohlcv(
self,
fqsn: str,
fqme: str,
ohlcv: np.ndarray,
timeframe: int,
append_and_duplicate: bool = True,
@ -570,7 +571,7 @@ class Storage:
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqsn}/{tfkey}/OHLCV',
tbk=f'{fqme}/{tfkey}/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
@ -593,7 +594,7 @@ class Storage:
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqsn}/{tfkey}/OHLCV',
tbk=f'{fqme}/{tfkey}/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
@ -625,8 +626,8 @@ class Storage:
@acm
async def open_storage_client(
fqsn: str,
period: Union[int, str | None] = None, # in seconds
host: str,
grpc_port: int,
) -> tuple[Storage, dict[str, np.ndarray]]:
'''
@ -635,7 +636,10 @@ async def open_storage_client(
'''
async with (
# eventually a storage backend endpoint
get_client() as client,
get_client(
host=host,
port=grpc_port,
) as client,
):
# slap on our wrapper api
yield Storage(client)
@ -643,7 +647,7 @@ async def open_storage_client(
@acm
async def open_tsdb_client(
fqsn: str,
fqme: str,
) -> Storage:
# TODO: real-time dedicated task for ensuring
@ -677,25 +681,31 @@ async def open_tsdb_client(
delayed=False,
)
# load any user service settings for connecting to tsdb
conf, path = config.load('conf')
tsdbconf = conf['network'].get('tsdb')
backend = tsdbconf.pop('backend')
async with (
open_storage_client(fqsn) as storage,
open_storage_client(
**tsdbconf,
) as storage,
maybe_open_feed(
[fqsn],
[fqme],
start_stream=False,
) as feed,
):
profiler(f'opened feed for {fqsn}')
profiler(f'opened feed for {fqme}')
# to_append = feed.hist_shm.array
# to_prepend = None
if fqsn:
flume = feed.flumes[fqsn]
if fqme:
flume = feed.flumes[fqme]
symbol = flume.symbol
if symbol:
fqsn = symbol.fqsn
fqme = symbol.fqme
# diff db history with shm and only write the missing portions
# ohlcv = flume.hist_shm.array
@ -703,7 +713,7 @@ async def open_tsdb_client(
# TODO: use pg profiler
# for secs in (1, 60):
# tsdb_array = await storage.read_ohlcv(
# fqsn,
# fqme,
# timeframe=timeframe,
# )
# # hist diffing:
@ -726,7 +736,7 @@ async def open_tsdb_client(
# log.info(
# f'Writing datums {array.size} -> to tsdb from shm\n'
# )
# await storage.write_ohlcv(fqsn, array)
# await storage.write_ohlcv(fqme, array)
# profiler('Finished db writes')