Add marketstore client as storage-backend module

To kick off our (tsdb) storage backends this adds our first implementing
a new `Storage(Protocol)` client interface. Going foward, the top level
`.storage` pkg-module will now expose backend agnostic APIs and helpers
whilst specific backend implementations will adhere to that middle-ware
layer.

Deats:
- add `.storage.marketstore.Storage` as the first client implementation,
  moving all needed (import) dependencies out from
  `.service.marketstore` as well as `.ohlc_key_map` and `get_client()`.
- move root `conf.toml` loading from `.data.history` into
  `.storage.__init__.open_storage_client()` which now takes in a `name:
  str` and does all the work of loading the correct backend module, its
  config, and determining if a service-instance can be contacted and
  a client loaded; in the case where this fails we raise a new
  `StorageConnectionError`.
- add a new `.storage.get_storagemod()` just like we have for brokers.
- make `open_storage_client()` also return the backend module such that
  the history-data layer can make backend specific calls as needed (eg.
  ohlc_key_map).
- fall back to a basic non-tsdb backfill when `open_storage_client()`
  raises the new connection error.
basic_buy_bot
Tyler Goodlet 2023-05-29 13:52:55 -04:00
parent 29211b200d
commit 7ab97fb21d
5 changed files with 474 additions and 338 deletions

View File

@ -121,7 +121,7 @@ def storesh(
Start an IPython shell ready to query the local marketstore db.
'''
from piker.data.marketstore import open_tsdb_client
from piker.storage import open_tsdb_client
from piker.service import open_piker_runtime
async def main():
@ -171,7 +171,7 @@ def storage(
Start an IPython shell ready to query the local marketstore db.
'''
from piker.service.marketstore import open_tsdb_client
from piker.storage import open_tsdb_client
from piker.service import open_piker_runtime
async def main():

View File

@ -28,7 +28,6 @@ import time
from types import ModuleType
from typing import (
Callable,
Optional,
TYPE_CHECKING,
)
@ -38,16 +37,12 @@ import tractor
import pendulum
import numpy as np
from .. import config
from ..accounting import (
MktPair,
)
from ._util import (
log,
)
from ..service import (
check_for_service,
)
from ._sharedmem import (
maybe_open_shm_array,
ShmArray,
@ -91,8 +86,8 @@ async def start_backfill(
sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
last_tsdb_dt: datetime | None = None,
storage: Storage | None = None,
write_tsdb: bool = True,
tsdb_is_up: bool = False,
@ -391,7 +386,7 @@ async def basic_backfill(
async def tsdb_backfill(
mod: ModuleType,
marketstore: ModuleType,
storemod: ModuleType,
bus: _FeedsBus,
storage: Storage,
mkt: MktPair,
@ -542,7 +537,7 @@ async def tsdb_backfill(
prepend=True,
# update_first=False,
# start=prepend_start,
field_map=marketstore.ohlc_key_map,
field_map=storemod.ohlc_key_map,
)
tsdb_last_frame_start = tsdb_history['Epoch'][0]
@ -580,7 +575,7 @@ async def tsdb_backfill(
shm.push(
to_push,
prepend=True,
field_map=marketstore.ohlc_key_map,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
@ -626,12 +621,11 @@ async def manage_history(
) -> None:
'''
Load and manage historical data including the loading of any
available series from `marketstore` as well as conducting real-time
update of both that existing db and the allocated shared memory
buffer.
available series from any connected tsdb as well as conduct
real-time update of both that existing db and the allocated shared
memory buffer.
'''
# TODO: is there a way to make each shm file key
# actor-tree-discovery-addr unique so we avoid collisions
# when doing tests which also allocate shms for certain instruments
@ -711,52 +705,17 @@ async def manage_history(
None,
)
assert open_history_client
tsdb_is_up: bool = False
try_remote_tsdb: bool = False
conf, path = config.load('conf', touch_if_dne=True)
net = conf.get('network')
if net:
tsdbconf = net.get('tsdb')
# lookup backend tsdb module by name and load any user service
# settings for connecting to the tsdb service.
tsdb_backend: str = tsdbconf.pop('backend')
tsdb_host: str = tsdbconf['host']
# TODO: import and load storagemod by name
# mod = get_storagemod(tsdb_backend)
from ..service import marketstore
if tsdb_host == 'localhost':
log.info('Scanning for existing `{tsbd_backend}`')
tsdb_is_up: bool = await check_for_service(f'{tsdb_backend}d')
else:
try_remote_tsdb: bool = True
if (
tsdb_is_up
or try_remote_tsdb
and (
opened
and open_history_client
)
):
log.info('Found existing `marketstored`')
async with (
marketstore.open_storage_client(
**tsdbconf
) as storage,
):
from .. import storage
try:
async with storage.open_storage_client() as (storemod, client):
log.info(f'Found existing `{storemod.name}`')
# TODO: drop returning the output that we pass in?
await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,
storemod,
bus,
storage,
client,
mkt,
{
1: rt_shm,
@ -784,11 +743,11 @@ async def manage_history(
# what data is loaded for viewing.
await trio.sleep_forever()
# load less history if no tsdb can be found
elif (
not tsdb_is_up
and opened
):
except storage.StorageConnectionError:
log.exception(
"Can't connect to tsdb backend!?\n"
'Starting basic backfille to shm..'
)
await basic_backfill(
bus,
mod,

View File

@ -327,16 +327,6 @@ _ohlcv_dt = [
]
ohlc_key_map = bidict({
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
})
def mk_tbk(keys: tuple[str, str, str]) -> str:
'''
Generate a marketstore table key from a tuple.
@ -388,24 +378,6 @@ def quote_to_marketstore_structarray(
return np.array([tuple(array_input)], dtype=_quote_dt)
@acm
async def get_client(
host: str | None,
port: int | None,
) -> MarketstoreClient:
'''
Load a ``anyio_marketstore`` grpc client connected
to an existing ``marketstore`` server.
'''
async with open_marketstore_client(
host or 'localhost',
port or _config['grpc_listen_port'],
) as client:
yield client
class MarketStoreError(Exception):
"Generic marketstore client error"
@ -444,6 +416,7 @@ async def ingest_quote_stream(
Ingest a broker quote stream into a ``marketstore`` tsdb.
'''
from piker.storage.marketstore import get_client
async with (
maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed,
get_client() as ms_client,

View File

@ -25,68 +25,67 @@
- test harness utilities for data-processing verification.
'''
from __future__ import annotations
from abc import abstractmethod
from contextlib import asynccontextmanager as acm
from functools import partial
from importlib import import_module
from datetime import datetime
# from pprint import pformat
from types import ModuleType
from typing import (
Union,
# Callable,
# Awaitable,
# Any,
# AsyncIterator,
Protocol,
# Generic,
# TypeVar,
)
import tractor
import numpy as np
from anyio_marketstore import (
Params,
from .. import config
from ..service import (
check_for_service,
)
import pendulum
import purerpc
from . import config
from ..service.marketstore import (
MarketstoreClient,
tf_in_1s,
mk_tbk,
_ohlcv_dt,
MarketStoreError,
from ..log import (
get_logger,
get_console_log,
)
subsys: str = 'piker.storage'
log = get_logger(subsys)
get_console_log = partial(
get_console_log,
name=subsys,
)
from ..data.feed import maybe_open_feed
from ..log import get_logger
from .._profile import Profiler
log = get_logger(__name__)
class Storage:
class Storage(
Protocol,
):
'''
High level storage api for both real-time and historical ingest.
Api description that all storage backends must implement
in order to suffice the historical data mgmt layer.
'''
def __init__(
self,
client: MarketstoreClient,
) -> None:
# TODO: eventually this should be an api/interface type that
# ensures we can support multiple tsdb backends.
self.client = client
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
@abstractmethod
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None:
@abstractmethod
def search_keys(self) -> list[str]:
...
# @abstractmethod
# async def write_ticks(
# self,
# ticks: list,
# ) -> ReceiveType:
# ...
# ``trio.abc.AsyncResource`` methods
@abstractmethod
async def load(
self,
fqme: str,
@ -97,30 +96,19 @@ class Storage:
datetime | None, # first dt
datetime | None, # last dt
]:
...
first_tsdb_dt, last_tsdb_dt = None, None
hist = await self.read_ohlcv(
fqme,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
timeframe=timeframe,
)
log.info(f'Loaded tsdb history {hist}')
@abstractmethod
async def delete_ts(
self,
key: str,
timeframe: int | str | None = None,
fmt: str = 'OHLCV',
if len(hist):
times = hist['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
)
return (
hist, # array-data
first_tsdb_dt, # start of query-frame
last_tsdb_dt, # most recent
)
) -> bool:
...
@abstractmethod
async def read_ohlcv(
self,
fqme: str,
@ -129,94 +117,7 @@ class Storage:
limit: int = int(800e3),
) -> np.ndarray:
client = self.client
syms = await client.list_symbols()
if fqme not in syms:
return {}
# use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
params = Params(
symbols=fqme,
timeframe=tfstr,
attrgroup='OHLCV',
end=end,
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=limit,
)
for i in range(3):
try:
result = await client.query(params)
break
except purerpc.grpclib.exceptions.UnknownError as err:
if 'snappy' in err.args:
await tractor.breakpoint()
# indicate there is no history for this timeframe
log.exception(
f'Unknown mkts QUERY error: {params}\n'
f'{err.args}'
)
else:
return {}
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
data_set = result.by_symbols()[fqme]
array = data_set.array
# XXX: ensure sample rate is as expected
time = data_set.array['Epoch']
if len(time) > 1:
time_step = time[-1] - time[-2]
ts = tf_in_1s.inverse[data_set.timeframe]
if time_step != ts:
log.warning(
f'MKTS BUG: wrong timeframe loaded: {time_step}'
'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG'
f'WIPING HISTORY FOR {ts}s'
)
await self.delete_ts(fqme, timeframe)
# try reading again..
return await self.read_ohlcv(
fqme,
timeframe,
end,
limit,
)
return array
async def delete_ts(
self,
key: str,
timeframe: Union[int, str | None] = None,
fmt: str = 'OHLCV',
) -> bool:
client = self.client
syms = await client.list_symbols()
if key not in syms:
await tractor.breakpoint()
raise KeyError(f'`{key}` table key not found in\n{syms}?')
tbk = mk_tbk((
key,
tf_in_1s.get(timeframe, tf_in_1s[60]),
fmt,
))
return await client.destroy(tbk=tbk)
...
async def write_ohlcv(
self,
@ -227,106 +128,74 @@ class Storage:
limit: int = int(800e3),
) -> None:
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(ohlcv),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = ohlcv[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
...
m, r = divmod(len(mkts_array), limit)
tfkey = tf_in_1s[timeframe]
for i in range(m, 1):
to_push = mkts_array[i-1:i*limit]
class StorageConnectionError(ConnectionError):
'''
Can't connect to the desired tsdb subsys/service.
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqme}/{tfkey}/OHLCV',
'''
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre-deduplicate?
isvariablelength=append_and_duplicate,
def get_storagemod(name: str) -> ModuleType:
mod: ModuleType = import_module(
'.' + name,
'piker.storage',
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
if r:
to_push = mkts_array[m*limit:]
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqme}/{tfkey}/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=append_and_duplicate,
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
# XXX: currently the only way to do this is through the CLI:
# sudo ./marketstore connect --dir ~/.config/piker/data
# >> \show mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# and this seems to block and use up mem..
# >> \trim mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# relevant source code for this is here:
# https://github.com/alpacahq/marketstore/blob/master/cmd/connect/session/trim.go#L14
# def delete_range(self, start_dt, end_dt) -> None:
# ...
# we only allow monkeying because it's for internal keying
mod.name = mod.__name__.split('.')[-1]
return mod
@acm
async def open_storage_client(
host: str,
grpc_port: int,
name: str | None = None,
) -> tuple[Storage, dict[str, np.ndarray]]:
) -> tuple[ModuleType, Storage]:
'''
Load a series by key and deliver in ``numpy`` struct array format.
Load the ``Storage`` client for named backend.
'''
from piker.service.marketstore import get_client
# load root config for tsdb
conf, path = config.load('conf', touch_if_dne=True)
net = conf.get('network')
if net:
tsdbconf = net.get('tsdb')
# lookup backend tsdb module by name and load any user service
# settings for connecting to the tsdb service.
name: str = tsdbconf.pop('backend')
tsdb_host: str = tsdbconf['host']
if name is None:
raise RuntimeError('No tsdb backend has been set!?')
# import and load storagemod by name
mod: ModuleType = get_storagemod(name)
get_client = mod.get_client
log.info('Scanning for existing `{tsbd_backend}`')
tsdb_is_up: bool = await check_for_service(f'{name}d')
if (
tsdb_host == 'localhost'
or tsdb_is_up
):
log.info(f'Connecting to local {name}@{tsdbconf}')
else:
log.info(f'Attempting to connect to remote {name}@{tsdbconf}')
try:
async with (
# eventually a storage backend endpoint
get_client(
host=host,
port=grpc_port,
) as client,
get_client(**tsdbconf) as client,
):
# slap on our wrapper api
yield Storage(client)
yield mod, client
except Exception as err:
raise StorageConnectionError(
f'No connection to {name}'
) from err
# NOTE: pretty sure right now this is only being
@ -362,22 +231,15 @@ async def open_tsdb_client(
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
from .._profile import Profiler
profiler = Profiler(
disabled=True, # not pg_profile_enabled(),
delayed=False,
)
from ..data.feed import maybe_open_feed
# load any user service settings for connecting to
rootconf, path = config.load(
'conf',
touch_if_dne=True,
)
tsdbconf = rootconf['network'].get('tsdb')
# backend = tsdbconf.pop('backend')
async with (
open_storage_client(
**tsdbconf,
) as storage,
open_storage_client() as (_, storage),
maybe_open_feed(
[fqme],

View File

@ -0,0 +1,342 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
marketstore tsdb backend:
https://github.com/alpacahq/marketstore
We wrote an async gGRPC client:
https://github.com/pikers/anyio-marketstore
which is normally preferred minus the discovered issues
in https://github.com/pikers/piker/issues/443
Which is the main reason for us moving away from this
platform..
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
# from pprint import pformat
from typing import (
Union,
)
from bidict import bidict
import tractor
import numpy as np
from anyio_marketstore import (
Params,
)
import pendulum
import purerpc
from ..service.marketstore import (
MarketstoreClient,
tf_in_1s,
mk_tbk,
_ohlcv_dt,
MarketStoreError,
)
from anyio_marketstore import ( # noqa
open_marketstore_client,
MarketstoreClient,
Params,
)
from ..log import get_logger
# from .._profile import Profiler
log = get_logger(__name__)
class Storage:
'''
High level storage api for both real-time and historical ingest.
'''
def __init__(
self,
client: MarketstoreClient,
) -> None:
# TODO: eventually this should be an api/interface type that
# ensures we can support multiple tsdb backends.
self.client = client
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None:
...
async def load(
self,
fqme: str,
timeframe: int,
) -> tuple[
np.ndarray, # timeframe sampled array-series
datetime | None, # first dt
datetime | None, # last dt
]:
first_tsdb_dt, last_tsdb_dt = None, None
hist = await self.read_ohlcv(
fqme,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
timeframe=timeframe,
)
log.info(f'Loaded tsdb history {hist}')
if len(hist):
times = hist['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
)
return (
hist, # array-data
first_tsdb_dt, # start of query-frame
last_tsdb_dt, # most recent
)
async def read_ohlcv(
self,
fqme: str,
timeframe: int | str,
end: int | None = None,
limit: int = int(800e3),
) -> np.ndarray:
client = self.client
syms = await client.list_symbols()
if fqme not in syms:
return {}
# use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
params = Params(
symbols=fqme,
timeframe=tfstr,
attrgroup='OHLCV',
end=end,
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=limit,
)
for i in range(3):
try:
result = await client.query(params)
break
except purerpc.grpclib.exceptions.UnknownError as err:
if 'snappy' in err.args:
await tractor.breakpoint()
# indicate there is no history for this timeframe
log.exception(
f'Unknown mkts QUERY error: {params}\n'
f'{err.args}'
)
else:
return {}
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
data_set = result.by_symbols()[fqme]
array = data_set.array
# XXX: ensure sample rate is as expected
time = data_set.array['Epoch']
if len(time) > 1:
time_step = time[-1] - time[-2]
ts = tf_in_1s.inverse[data_set.timeframe]
if time_step != ts:
log.warning(
f'MKTS BUG: wrong timeframe loaded: {time_step}'
'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG'
f'WIPING HISTORY FOR {ts}s'
)
await self.delete_ts(fqme, timeframe)
# try reading again..
return await self.read_ohlcv(
fqme,
timeframe,
end,
limit,
)
return array
async def delete_ts(
self,
key: str,
timeframe: Union[int, str | None] = None,
fmt: str = 'OHLCV',
) -> bool:
client = self.client
syms = await client.list_symbols()
if key not in syms:
await tractor.breakpoint()
raise KeyError(f'`{key}` table key not found in\n{syms}?')
tbk = mk_tbk((
key,
tf_in_1s.get(timeframe, tf_in_1s[60]),
fmt,
))
return await client.destroy(tbk=tbk)
async def write_ohlcv(
self,
fqme: str,
ohlcv: np.ndarray,
timeframe: int,
append_and_duplicate: bool = True,
limit: int = int(800e3),
) -> None:
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(ohlcv),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = ohlcv[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
m, r = divmod(len(mkts_array), limit)
tfkey = tf_in_1s[timeframe]
for i in range(m, 1):
to_push = mkts_array[i-1:i*limit]
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqme}/{tfkey}/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre-deduplicate?
isvariablelength=append_and_duplicate,
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
if r:
to_push = mkts_array[m*limit:]
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqme}/{tfkey}/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=append_and_duplicate,
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
# XXX: currently the only way to do this is through the CLI:
# sudo ./marketstore connect --dir ~/.config/piker/data
# >> \show mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# and this seems to block and use up mem..
# >> \trim mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# relevant source code for this is here:
# https://github.com/alpacahq/marketstore/blob/master/cmd/connect/session/trim.go#L14
# def delete_range(self, start_dt, end_dt) -> None:
# ...
ohlc_key_map = bidict({
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
})
@acm
async def get_client(
grpc_port: int, # required
host: str | None,
) -> MarketstoreClient:
'''
Load a ``anyio_marketstore`` grpc client connected
to an existing ``marketstore`` server.
'''
async with open_marketstore_client(
host or 'localhost',
grpc_port,
) as client:
yield Storage(client)