diff --git a/piker/data/cli.py b/piker/data/cli.py index 59db1037..f855717b 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -121,7 +121,7 @@ def storesh( Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import open_tsdb_client + from piker.storage import open_tsdb_client from piker.service import open_piker_runtime async def main(): @@ -171,7 +171,7 @@ def storage( Start an IPython shell ready to query the local marketstore db. ''' - from piker.service.marketstore import open_tsdb_client + from piker.storage import open_tsdb_client from piker.service import open_piker_runtime async def main(): diff --git a/piker/data/history.py b/piker/data/history.py index ebfe8c65..a29d2ab9 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -28,7 +28,6 @@ import time from types import ModuleType from typing import ( Callable, - Optional, TYPE_CHECKING, ) @@ -38,16 +37,12 @@ import tractor import pendulum import numpy as np -from .. import config from ..accounting import ( MktPair, ) from ._util import ( log, ) -from ..service import ( - check_for_service, -) from ._sharedmem import ( maybe_open_shm_array, ShmArray, @@ -91,8 +86,8 @@ async def start_backfill( sampler_stream: tractor.MsgStream, feed_is_live: trio.Event, - last_tsdb_dt: Optional[datetime] = None, - storage: Optional[Storage] = None, + last_tsdb_dt: datetime | None = None, + storage: Storage | None = None, write_tsdb: bool = True, tsdb_is_up: bool = False, @@ -391,7 +386,7 @@ async def basic_backfill( async def tsdb_backfill( mod: ModuleType, - marketstore: ModuleType, + storemod: ModuleType, bus: _FeedsBus, storage: Storage, mkt: MktPair, @@ -542,7 +537,7 @@ async def tsdb_backfill( prepend=True, # update_first=False, # start=prepend_start, - field_map=marketstore.ohlc_key_map, + field_map=storemod.ohlc_key_map, ) tsdb_last_frame_start = tsdb_history['Epoch'][0] @@ -580,7 +575,7 @@ async def tsdb_backfill( shm.push( to_push, prepend=True, - field_map=marketstore.ohlc_key_map, + field_map=storemod.ohlc_key_map, ) log.info(f'Loaded {to_push.shape} datums from storage') @@ -626,12 +621,11 @@ async def manage_history( ) -> None: ''' Load and manage historical data including the loading of any - available series from `marketstore` as well as conducting real-time - update of both that existing db and the allocated shared memory - buffer. + available series from any connected tsdb as well as conduct + real-time update of both that existing db and the allocated shared + memory buffer. ''' - # TODO: is there a way to make each shm file key # actor-tree-discovery-addr unique so we avoid collisions # when doing tests which also allocate shms for certain instruments @@ -711,52 +705,17 @@ async def manage_history( None, ) assert open_history_client - - tsdb_is_up: bool = False - try_remote_tsdb: bool = False - - conf, path = config.load('conf', touch_if_dne=True) - net = conf.get('network') - if net: - tsdbconf = net.get('tsdb') - - # lookup backend tsdb module by name and load any user service - # settings for connecting to the tsdb service. - tsdb_backend: str = tsdbconf.pop('backend') - tsdb_host: str = tsdbconf['host'] - - # TODO: import and load storagemod by name - # mod = get_storagemod(tsdb_backend) - from ..service import marketstore - if tsdb_host == 'localhost': - log.info('Scanning for existing `{tsbd_backend}`') - tsdb_is_up: bool = await check_for_service(f'{tsdb_backend}d') - - else: - try_remote_tsdb: bool = True - - if ( - tsdb_is_up - or try_remote_tsdb - and ( - opened - and open_history_client - ) - ): - log.info('Found existing `marketstored`') - - async with ( - marketstore.open_storage_client( - **tsdbconf - ) as storage, - ): + from .. import storage + try: + async with storage.open_storage_client() as (storemod, client): + log.info(f'Found existing `{storemod.name}`') # TODO: drop returning the output that we pass in? await bus.nursery.start( tsdb_backfill, mod, - marketstore, + storemod, bus, - storage, + client, mkt, { 1: rt_shm, @@ -784,11 +743,11 @@ async def manage_history( # what data is loaded for viewing. await trio.sleep_forever() - # load less history if no tsdb can be found - elif ( - not tsdb_is_up - and opened - ): + except storage.StorageConnectionError: + log.exception( + "Can't connect to tsdb backend!?\n" + 'Starting basic backfille to shm..' + ) await basic_backfill( bus, mod, diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index 93656ab3..c9f49420 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -327,16 +327,6 @@ _ohlcv_dt = [ ] -ohlc_key_map = bidict({ - 'Epoch': 'time', - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', -}) - - def mk_tbk(keys: tuple[str, str, str]) -> str: ''' Generate a marketstore table key from a tuple. @@ -388,24 +378,6 @@ def quote_to_marketstore_structarray( return np.array([tuple(array_input)], dtype=_quote_dt) -@acm -async def get_client( - host: str | None, - port: int | None, - -) -> MarketstoreClient: - ''' - Load a ``anyio_marketstore`` grpc client connected - to an existing ``marketstore`` server. - - ''' - async with open_marketstore_client( - host or 'localhost', - port or _config['grpc_listen_port'], - ) as client: - yield client - - class MarketStoreError(Exception): "Generic marketstore client error" @@ -444,6 +416,7 @@ async def ingest_quote_stream( Ingest a broker quote stream into a ``marketstore`` tsdb. ''' + from piker.storage.marketstore import get_client async with ( maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed, get_client() as ms_client, diff --git a/piker/storage/__init__.py b/piker/storage/__init__.py index 8169f4ef..3aeefc37 100644 --- a/piker/storage/__init__.py +++ b/piker/storage/__init__.py @@ -25,68 +25,67 @@ - test harness utilities for data-processing verification. ''' -from __future__ import annotations +from abc import abstractmethod from contextlib import asynccontextmanager as acm +from functools import partial +from importlib import import_module from datetime import datetime -# from pprint import pformat +from types import ModuleType from typing import ( - Union, + # Callable, + # Awaitable, + # Any, + # AsyncIterator, + Protocol, + # Generic, + # TypeVar, ) -import tractor import numpy as np -from anyio_marketstore import ( - Params, + + +from .. import config +from ..service import ( + check_for_service, ) -import pendulum -import purerpc - -from . import config -from ..service.marketstore import ( - MarketstoreClient, - tf_in_1s, - mk_tbk, - _ohlcv_dt, - MarketStoreError, +from ..log import ( + get_logger, + get_console_log, +) +subsys: str = 'piker.storage' + +log = get_logger(subsys) +get_console_log = partial( + get_console_log, + name=subsys, ) -from ..data.feed import maybe_open_feed -from ..log import get_logger -from .._profile import Profiler -log = get_logger(__name__) - - -class Storage: +class Storage( + Protocol, +): ''' - High level storage api for both real-time and historical ingest. + Api description that all storage backends must implement + in order to suffice the historical data mgmt layer. ''' - def __init__( - self, - client: MarketstoreClient, - - ) -> None: - # TODO: eventually this should be an api/interface type that - # ensures we can support multiple tsdb backends. - self.client = client - - # series' cache from tsdb reads - self._arrays: dict[str, np.ndarray] = {} - + @abstractmethod async def list_keys(self) -> list[str]: - return await self.client.list_symbols() - - async def search_keys(self, pattern: str) -> list[str]: - ''' - Search for time series key in the storage backend. - - ''' ... - async def write_ticks(self, ticks: list) -> None: + @abstractmethod + def search_keys(self) -> list[str]: ... + # @abstractmethod + # async def write_ticks( + # self, + # ticks: list, + # ) -> ReceiveType: + # ... + + # ``trio.abc.AsyncResource`` methods + @abstractmethod async def load( self, fqme: str, @@ -97,30 +96,19 @@ class Storage: datetime | None, # first dt datetime | None, # last dt ]: + ... - first_tsdb_dt, last_tsdb_dt = None, None - hist = await self.read_ohlcv( - fqme, - # on first load we don't need to pull the max - # history per request size worth. - limit=3000, - timeframe=timeframe, - ) - log.info(f'Loaded tsdb history {hist}') + @abstractmethod + async def delete_ts( + self, + key: str, + timeframe: int | str | None = None, + fmt: str = 'OHLCV', - if len(hist): - times = hist['Epoch'] - first, last = times[0], times[-1] - first_tsdb_dt, last_tsdb_dt = map( - pendulum.from_timestamp, [first, last] - ) - - return ( - hist, # array-data - first_tsdb_dt, # start of query-frame - last_tsdb_dt, # most recent - ) + ) -> bool: + ... + @abstractmethod async def read_ohlcv( self, fqme: str, @@ -129,94 +117,7 @@ class Storage: limit: int = int(800e3), ) -> np.ndarray: - - client = self.client - syms = await client.list_symbols() - - if fqme not in syms: - return {} - - # use the provided timeframe or 1s by default - tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) - - params = Params( - symbols=fqme, - timeframe=tfstr, - attrgroup='OHLCV', - end=end, - # limit_from_start=True, - - # TODO: figure the max limit here given the - # ``purepc`` msg size limit of purerpc: 33554432 - limit=limit, - ) - - for i in range(3): - try: - result = await client.query(params) - break - except purerpc.grpclib.exceptions.UnknownError as err: - if 'snappy' in err.args: - await tractor.breakpoint() - - # indicate there is no history for this timeframe - log.exception( - f'Unknown mkts QUERY error: {params}\n' - f'{err.args}' - ) - else: - return {} - - # TODO: it turns out column access on recarrays is actually slower: - # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist - # it might make sense to make these structured arrays? - data_set = result.by_symbols()[fqme] - array = data_set.array - - # XXX: ensure sample rate is as expected - time = data_set.array['Epoch'] - if len(time) > 1: - time_step = time[-1] - time[-2] - ts = tf_in_1s.inverse[data_set.timeframe] - - if time_step != ts: - log.warning( - f'MKTS BUG: wrong timeframe loaded: {time_step}' - 'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG' - f'WIPING HISTORY FOR {ts}s' - ) - await self.delete_ts(fqme, timeframe) - - # try reading again.. - return await self.read_ohlcv( - fqme, - timeframe, - end, - limit, - ) - - return array - - async def delete_ts( - self, - key: str, - timeframe: Union[int, str | None] = None, - fmt: str = 'OHLCV', - - ) -> bool: - - client = self.client - syms = await client.list_symbols() - if key not in syms: - await tractor.breakpoint() - raise KeyError(f'`{key}` table key not found in\n{syms}?') - - tbk = mk_tbk(( - key, - tf_in_1s.get(timeframe, tf_in_1s[60]), - fmt, - )) - return await client.destroy(tbk=tbk) + ... async def write_ohlcv( self, @@ -227,106 +128,74 @@ class Storage: limit: int = int(800e3), ) -> None: - # build mkts schema compat array for writing - mkts_dt = np.dtype(_ohlcv_dt) - mkts_array = np.zeros( - len(ohlcv), - dtype=mkts_dt, - ) - # copy from shm array (yes it's this easy): - # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays - mkts_array[:] = ohlcv[[ - 'time', - 'open', - 'high', - 'low', - 'close', - 'volume', - ]] + ... - m, r = divmod(len(mkts_array), limit) - tfkey = tf_in_1s[timeframe] - for i in range(m, 1): - to_push = mkts_array[i-1:i*limit] +class StorageConnectionError(ConnectionError): + ''' + Can't connect to the desired tsdb subsys/service. - # write to db - resp = await self.client.write( - to_push, - tbk=f'{fqme}/{tfkey}/OHLCV', + ''' - # NOTE: will will append duplicates - # for the same timestamp-index. - # TODO: pre-deduplicate? - isvariablelength=append_and_duplicate, - ) +def get_storagemod(name: str) -> ModuleType: + mod: ModuleType = import_module( + '.' + name, + 'piker.storage', + ) - log.info( - f'Wrote {mkts_array.size} datums to tsdb\n' - ) - - for resp in resp.responses: - err = resp.error - if err: - raise MarketStoreError(err) - - if r: - to_push = mkts_array[m*limit:] - - # write to db - resp = await self.client.write( - to_push, - tbk=f'{fqme}/{tfkey}/OHLCV', - - # NOTE: will will append duplicates - # for the same timestamp-index. - # TODO: pre deduplicate? - isvariablelength=append_and_duplicate, - ) - - log.info( - f'Wrote {mkts_array.size} datums to tsdb\n' - ) - - for resp in resp.responses: - err = resp.error - if err: - raise MarketStoreError(err) - - # XXX: currently the only way to do this is through the CLI: - - # sudo ./marketstore connect --dir ~/.config/piker/data - # >> \show mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15 - # and this seems to block and use up mem.. - # >> \trim mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15 - - # relevant source code for this is here: - # https://github.com/alpacahq/marketstore/blob/master/cmd/connect/session/trim.go#L14 - # def delete_range(self, start_dt, end_dt) -> None: - # ... + # we only allow monkeying because it's for internal keying + mod.name = mod.__name__.split('.')[-1] + return mod @acm async def open_storage_client( - host: str, - grpc_port: int, + name: str | None = None, -) -> tuple[Storage, dict[str, np.ndarray]]: +) -> tuple[ModuleType, Storage]: ''' - Load a series by key and deliver in ``numpy`` struct array format. + Load the ``Storage`` client for named backend. ''' - from piker.service.marketstore import get_client + # load root config for tsdb + conf, path = config.load('conf', touch_if_dne=True) + net = conf.get('network') + if net: + tsdbconf = net.get('tsdb') - async with ( - # eventually a storage backend endpoint - get_client( - host=host, - port=grpc_port, - ) as client, + # lookup backend tsdb module by name and load any user service + # settings for connecting to the tsdb service. + name: str = tsdbconf.pop('backend') + tsdb_host: str = tsdbconf['host'] + + if name is None: + raise RuntimeError('No tsdb backend has been set!?') + + # import and load storagemod by name + mod: ModuleType = get_storagemod(name) + get_client = mod.get_client + + log.info('Scanning for existing `{tsbd_backend}`') + tsdb_is_up: bool = await check_for_service(f'{name}d') + if ( + tsdb_host == 'localhost' + or tsdb_is_up ): - # slap on our wrapper api - yield Storage(client) + log.info(f'Connecting to local {name}@{tsdbconf}') + else: + log.info(f'Attempting to connect to remote {name}@{tsdbconf}') + + try: + async with ( + get_client(**tsdbconf) as client, + ): + # slap on our wrapper api + yield mod, client + + except Exception as err: + raise StorageConnectionError( + f'No connection to {name}' + ) from err # NOTE: pretty sure right now this is only being @@ -362,22 +231,15 @@ async def open_tsdb_client( # * the original data feed arch blurb: # - https://github.com/pikers/piker/issues/98 # + from .._profile import Profiler profiler = Profiler( disabled=True, # not pg_profile_enabled(), delayed=False, ) + from ..data.feed import maybe_open_feed - # load any user service settings for connecting to - rootconf, path = config.load( - 'conf', - touch_if_dne=True, - ) - tsdbconf = rootconf['network'].get('tsdb') - # backend = tsdbconf.pop('backend') async with ( - open_storage_client( - **tsdbconf, - ) as storage, + open_storage_client() as (_, storage), maybe_open_feed( [fqme], diff --git a/piker/storage/marketstore.py b/piker/storage/marketstore.py new file mode 100644 index 00000000..9aad2230 --- /dev/null +++ b/piker/storage/marketstore.py @@ -0,0 +1,342 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +marketstore tsdb backend: +https://github.com/alpacahq/marketstore + + +We wrote an async gGRPC client: +https://github.com/pikers/anyio-marketstore + +which is normally preferred minus the discovered issues +in https://github.com/pikers/piker/issues/443 + +Which is the main reason for us moving away from this +platform.. + +''' +from __future__ import annotations +from contextlib import asynccontextmanager as acm +from datetime import datetime +# from pprint import pformat +from typing import ( + Union, +) + +from bidict import bidict +import tractor +import numpy as np +from anyio_marketstore import ( + Params, +) +import pendulum +import purerpc + +from ..service.marketstore import ( + MarketstoreClient, + tf_in_1s, + mk_tbk, + _ohlcv_dt, + MarketStoreError, +) +from anyio_marketstore import ( # noqa + open_marketstore_client, + MarketstoreClient, + Params, +) +from ..log import get_logger +# from .._profile import Profiler + + +log = get_logger(__name__) + + +class Storage: + ''' + High level storage api for both real-time and historical ingest. + + ''' + def __init__( + self, + client: MarketstoreClient, + + ) -> None: + # TODO: eventually this should be an api/interface type that + # ensures we can support multiple tsdb backends. + self.client = client + + # series' cache from tsdb reads + self._arrays: dict[str, np.ndarray] = {} + + async def list_keys(self) -> list[str]: + return await self.client.list_symbols() + + async def search_keys(self, pattern: str) -> list[str]: + ''' + Search for time series key in the storage backend. + + ''' + ... + + async def write_ticks(self, ticks: list) -> None: + ... + + async def load( + self, + fqme: str, + timeframe: int, + + ) -> tuple[ + np.ndarray, # timeframe sampled array-series + datetime | None, # first dt + datetime | None, # last dt + ]: + + first_tsdb_dt, last_tsdb_dt = None, None + hist = await self.read_ohlcv( + fqme, + # on first load we don't need to pull the max + # history per request size worth. + limit=3000, + timeframe=timeframe, + ) + log.info(f'Loaded tsdb history {hist}') + + if len(hist): + times = hist['Epoch'] + first, last = times[0], times[-1] + first_tsdb_dt, last_tsdb_dt = map( + pendulum.from_timestamp, [first, last] + ) + + return ( + hist, # array-data + first_tsdb_dt, # start of query-frame + last_tsdb_dt, # most recent + ) + + async def read_ohlcv( + self, + fqme: str, + timeframe: int | str, + end: int | None = None, + limit: int = int(800e3), + + ) -> np.ndarray: + + client = self.client + syms = await client.list_symbols() + + if fqme not in syms: + return {} + + # use the provided timeframe or 1s by default + tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) + + params = Params( + symbols=fqme, + timeframe=tfstr, + attrgroup='OHLCV', + end=end, + # limit_from_start=True, + + # TODO: figure the max limit here given the + # ``purepc`` msg size limit of purerpc: 33554432 + limit=limit, + ) + + for i in range(3): + try: + result = await client.query(params) + break + except purerpc.grpclib.exceptions.UnknownError as err: + if 'snappy' in err.args: + await tractor.breakpoint() + + # indicate there is no history for this timeframe + log.exception( + f'Unknown mkts QUERY error: {params}\n' + f'{err.args}' + ) + else: + return {} + + # TODO: it turns out column access on recarrays is actually slower: + # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist + # it might make sense to make these structured arrays? + data_set = result.by_symbols()[fqme] + array = data_set.array + + # XXX: ensure sample rate is as expected + time = data_set.array['Epoch'] + if len(time) > 1: + time_step = time[-1] - time[-2] + ts = tf_in_1s.inverse[data_set.timeframe] + + if time_step != ts: + log.warning( + f'MKTS BUG: wrong timeframe loaded: {time_step}' + 'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG' + f'WIPING HISTORY FOR {ts}s' + ) + await self.delete_ts(fqme, timeframe) + + # try reading again.. + return await self.read_ohlcv( + fqme, + timeframe, + end, + limit, + ) + + return array + + async def delete_ts( + self, + key: str, + timeframe: Union[int, str | None] = None, + fmt: str = 'OHLCV', + + ) -> bool: + + client = self.client + syms = await client.list_symbols() + if key not in syms: + await tractor.breakpoint() + raise KeyError(f'`{key}` table key not found in\n{syms}?') + + tbk = mk_tbk(( + key, + tf_in_1s.get(timeframe, tf_in_1s[60]), + fmt, + )) + return await client.destroy(tbk=tbk) + + async def write_ohlcv( + self, + fqme: str, + ohlcv: np.ndarray, + timeframe: int, + append_and_duplicate: bool = True, + limit: int = int(800e3), + + ) -> None: + # build mkts schema compat array for writing + mkts_dt = np.dtype(_ohlcv_dt) + mkts_array = np.zeros( + len(ohlcv), + dtype=mkts_dt, + ) + # copy from shm array (yes it's this easy): + # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays + mkts_array[:] = ohlcv[[ + 'time', + 'open', + 'high', + 'low', + 'close', + 'volume', + ]] + + m, r = divmod(len(mkts_array), limit) + + tfkey = tf_in_1s[timeframe] + for i in range(m, 1): + to_push = mkts_array[i-1:i*limit] + + # write to db + resp = await self.client.write( + to_push, + tbk=f'{fqme}/{tfkey}/OHLCV', + + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre-deduplicate? + isvariablelength=append_and_duplicate, + ) + + log.info( + f'Wrote {mkts_array.size} datums to tsdb\n' + ) + + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) + + if r: + to_push = mkts_array[m*limit:] + + # write to db + resp = await self.client.write( + to_push, + tbk=f'{fqme}/{tfkey}/OHLCV', + + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre deduplicate? + isvariablelength=append_and_duplicate, + ) + + log.info( + f'Wrote {mkts_array.size} datums to tsdb\n' + ) + + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) + + # XXX: currently the only way to do this is through the CLI: + + # sudo ./marketstore connect --dir ~/.config/piker/data + # >> \show mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15 + # and this seems to block and use up mem.. + # >> \trim mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15 + + # relevant source code for this is here: + # https://github.com/alpacahq/marketstore/blob/master/cmd/connect/session/trim.go#L14 + # def delete_range(self, start_dt, end_dt) -> None: + # ... + + +ohlc_key_map = bidict({ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', +}) + + +@acm +async def get_client( + grpc_port: int, # required + host: str | None, + +) -> MarketstoreClient: + ''' + Load a ``anyio_marketstore`` grpc client connected + to an existing ``marketstore`` server. + + ''' + async with open_marketstore_client( + host or 'localhost', + grpc_port, + ) as client: + yield Storage(client)