ROFL, even using `pymarketstore`'s json-RPC it's borked..
Turns out trying to switch to the old sync client and going back to using the old json-RPC API (after having had to patch the upstream repo to not import gRPC machinery to avoid crashes..) I'm basically getting the exact same issues. New tinkering results does possibly tell some new stuff: - the EOF error seems to indeed be due to trying fetch records which haven't been written (properly) - like asking for a `end=<epoch_int>` that is earlier then the earliest record. - the "snappy input corrupt" error seems to have something to do with the `Params.end` field not being an `int` and/or the int precision not being chosen correctly? - toying with this a bunch manually shows that the internals of the client (particularly `.build_query()` stuff) is parsing/calcing the `Epoch` and `Nanoseconds` values out incorrectly.. which is likely part of the problem. - we also changed `anyio_marketstore.MarketStoreclient.build_query()` logic when removing `pandas` a while back, which also seems to be part of the problem on the async side, however reverting those changes also didn't fix the issue entirely; likely something else more subtle going on (maybe with the write vs. read `Epoch` field type we pass?). Despite all this malarky, we're already underway more or less obsoleting this whole thing with a much less complex approach of using apache parquet files and modern filesystem tools to get a more flexible and numerics-native dataframe-oriented tsdb B)basic_buy_bot
parent
9859f601ca
commit
7d1cc47db9
|
@ -46,7 +46,7 @@ from anyio_marketstore import (
|
|||
import pendulum
|
||||
import purerpc
|
||||
|
||||
from ..service.marketstore import (
|
||||
from piker.service.marketstore import (
|
||||
MarketstoreClient,
|
||||
tf_in_1s,
|
||||
mk_tbk,
|
||||
|
@ -58,7 +58,7 @@ from anyio_marketstore import ( # noqa
|
|||
MarketstoreClient,
|
||||
Params,
|
||||
)
|
||||
from ..log import get_logger
|
||||
from piker.log import get_logger
|
||||
# from .._profile import Profiler
|
||||
|
||||
|
||||
|
@ -107,7 +107,6 @@ class MktsStorageClient:
|
|||
datetime | None, # first dt
|
||||
datetime | None, # last dt
|
||||
]:
|
||||
|
||||
first_tsdb_dt, last_tsdb_dt = None, None
|
||||
hist = await self.read_ohlcv(
|
||||
fqme,
|
||||
|
@ -119,10 +118,13 @@ class MktsStorageClient:
|
|||
log.info(f'Loaded tsdb history {hist}')
|
||||
|
||||
if len(hist):
|
||||
times = hist['Epoch']
|
||||
# breakpoint()
|
||||
times: np.ndarray = hist['Epoch']
|
||||
|
||||
first, last = times[0], times[-1]
|
||||
first_tsdb_dt, last_tsdb_dt = map(
|
||||
pendulum.from_timestamp, [first, last]
|
||||
pendulum.from_timestamp,
|
||||
[first, last]
|
||||
)
|
||||
|
||||
return (
|
||||
|
@ -135,53 +137,82 @@ class MktsStorageClient:
|
|||
self,
|
||||
fqme: str,
|
||||
timeframe: int | str,
|
||||
end: int | None = None,
|
||||
limit: int = int(800e3),
|
||||
end: float | None = None, # epoch or none
|
||||
limit: int = int(200e3),
|
||||
|
||||
) -> np.ndarray:
|
||||
|
||||
client = self.client
|
||||
syms = await client.list_symbols()
|
||||
|
||||
if fqme not in syms:
|
||||
return {}
|
||||
|
||||
# ensure end time is in correct int format!
|
||||
if (
|
||||
end
|
||||
and not isinstance(end, float)
|
||||
):
|
||||
end = int(float(end))
|
||||
# breakpoint()
|
||||
|
||||
# use the provided timeframe or 1s by default
|
||||
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
|
||||
|
||||
params = Params(
|
||||
import pymarketstore as pymkts
|
||||
sync_client = pymkts.Client()
|
||||
param = pymkts.Params(
|
||||
symbols=fqme,
|
||||
timeframe=tfstr,
|
||||
attrgroup='OHLCV',
|
||||
end=end,
|
||||
# limit_from_start=True,
|
||||
|
||||
# TODO: figure the max limit here given the
|
||||
# ``purepc`` msg size limit of purerpc: 33554432
|
||||
limit=limit,
|
||||
# limit_from_start=True,
|
||||
)
|
||||
try:
|
||||
reply = sync_client.query(param)
|
||||
except Exception as err:
|
||||
if 'no files returned from query parse: None' in err.args:
|
||||
return []
|
||||
|
||||
for i in range(3):
|
||||
try:
|
||||
result = await client.query(params)
|
||||
break
|
||||
except purerpc.grpclib.exceptions.UnknownError as err:
|
||||
if 'snappy' in err.args:
|
||||
await tractor.breakpoint()
|
||||
raise
|
||||
|
||||
# indicate there is no history for this timeframe
|
||||
log.exception(
|
||||
f'Unknown mkts QUERY error: {params}\n'
|
||||
f'{err.args}'
|
||||
)
|
||||
else:
|
||||
return {}
|
||||
data_set: pymkts.results.DataSet = reply.first()
|
||||
array: np.ndarray = data_set.array
|
||||
|
||||
# TODO: it turns out column access on recarrays is actually slower:
|
||||
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
||||
# it might make sense to make these structured arrays?
|
||||
data_set = result.by_symbols()[fqme]
|
||||
array = data_set.array
|
||||
# params = Params(
|
||||
# symbols=fqme,
|
||||
# timeframe=tfstr,
|
||||
# attrgroup='OHLCV',
|
||||
# end=end,
|
||||
# # limit_from_start=True,
|
||||
|
||||
# # TODO: figure the max limit here given the
|
||||
# # ``purepc`` msg size limit of purerpc: 33554432
|
||||
# limit=limit,
|
||||
# )
|
||||
|
||||
# for i in range(3):
|
||||
# try:
|
||||
# result = await client.query(params)
|
||||
# break
|
||||
# except purerpc.grpclib.exceptions.UnknownError as err:
|
||||
# if 'snappy' in err.args:
|
||||
# await tractor.breakpoint()
|
||||
|
||||
# # indicate there is no history for this timeframe
|
||||
# log.exception(
|
||||
# f'Unknown mkts QUERY error: {params}\n'
|
||||
# f'{err.args}'
|
||||
# )
|
||||
# else:
|
||||
# return {}
|
||||
|
||||
# # TODO: it turns out column access on recarrays is actually slower:
|
||||
# # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
||||
# # it might make sense to make these structured arrays?
|
||||
# data_set = result.by_symbols()[fqme]
|
||||
# array = data_set.array
|
||||
|
||||
# XXX: ensure sample rate is as expected
|
||||
time = data_set.array['Epoch']
|
||||
|
@ -191,19 +222,20 @@ class MktsStorageClient:
|
|||
|
||||
if time_step != ts:
|
||||
log.warning(
|
||||
f'MKTS BUG: wrong timeframe loaded: {time_step}'
|
||||
'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG'
|
||||
f'MKTS BUG: wrong timeframe loaded: {time_step}\n'
|
||||
'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG '
|
||||
f'WIPING HISTORY FOR {ts}s'
|
||||
)
|
||||
await self.delete_ts(fqme, timeframe)
|
||||
await tractor.breakpoint()
|
||||
# await self.delete_ts(fqme, timeframe)
|
||||
|
||||
# try reading again..
|
||||
return await self.read_ohlcv(
|
||||
fqme,
|
||||
timeframe,
|
||||
end,
|
||||
limit,
|
||||
)
|
||||
# return await self.read_ohlcv(
|
||||
# fqme,
|
||||
# timeframe,
|
||||
# end,
|
||||
# limit,
|
||||
# )
|
||||
|
||||
return array
|
||||
|
||||
|
|
Loading…
Reference in New Issue