From c020ab76becef4816b1ab267c5254ea7fa6b586a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Jun 2023 13:20:28 -0400 Subject: [PATCH] Clean out marketstore specifics - drop buncha cruft from `store ls` cmd and make it work for multi-backend fqme listing. - including adding an `.address` to the mkts client which shows the grpc socketaddr details. - change defauls to new `'nativedb'. - drop 'marketstore' from built-in backend list (for now) --- piker/storage/__init__.py | 47 +++++++------ piker/storage/cli.py | 99 ++++----------------------- piker/storage/marketstore/__init__.py | 16 ++++- 3 files changed, 52 insertions(+), 110 deletions(-) diff --git a/piker/storage/__init__.py b/piker/storage/__init__.py index cca77c69..465d3e28 100644 --- a/piker/storage/__init__.py +++ b/piker/storage/__init__.py @@ -62,7 +62,8 @@ get_console_log = partial( __tsdbs__: list[str] = [ - 'marketstore', + 'nativedb', + # 'marketstore', ] @@ -157,13 +158,14 @@ def get_storagemod(name: str) -> ModuleType: @acm async def open_storage_client( - name: str = 'nativedb', + backend: str | None = None, ) -> tuple[ModuleType, StorageClient]: ''' Load the ``StorageClient`` for named backend. ''' + def_backend: str = 'nativedb' tsdb_host: str = 'localhost' # load root config and any tsdb user defined settings @@ -171,44 +173,45 @@ async def open_storage_client( # TODO: maybe not under a "network" section.. since # no more chitty mkts.. - net = conf.get('network') - if net: - tsdbconf = net.get('tsdb') + tsdbconf: dict = {} + service_section = conf.get('service') + if ( + not backend + and service_section + ): + tsdbconf = service_section.get('tsdb') # lookup backend tsdb module by name and load any user service # settings for connecting to the tsdb service. - name: str = tsdbconf.pop('backend') + backend: str = tsdbconf.pop('backend') tsdb_host: str = tsdbconf['host'] - if name is None: - raise RuntimeError('No tsdb backend has been set!?') + if backend is None: + backend: str = def_backend # import and load storagemod by name - mod: ModuleType = get_storagemod(name) + mod: ModuleType = get_storagemod(backend) get_client = mod.get_client log.info('Scanning for existing `{tsbd_backend}`') - tsdb_is_up: bool = await check_for_service(f'{name}d') - if ( - tsdb_host == 'localhost' - or tsdb_is_up - ): - log.info(f'Connecting to local {name}@{tsdbconf}') + if backend != def_backend: + tsdb_is_up: bool = await check_for_service(f'{backend}d') + if ( + tsdb_host == 'localhost' + or tsdb_is_up + ): + log.info(f'Connecting to local: {backend}@{tsdbconf}') + else: + log.info(f'Attempting to connect to remote: {backend}@{tsdbconf}') else: - log.info(f'Attempting to connect to remote {name}@{tsdbconf}') + log.info(f'Connecting to default storage: {backend}@{tsdbconf}') - # try: async with ( get_client(**tsdbconf) as client, ): # slap on our wrapper api yield mod, client - # except Exception as err: - # raise StorageConnectionError( - # f'No connection to {name}' - # ) from err - # NOTE: pretty sure right now this is only being # called by a CLI entrypoint? diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 11d2b490..9ca170c8 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -59,7 +59,6 @@ def ls( if not backends: backends: list[str] = __tsdbs__ - table = Table(title=f'Table keys for backends {backends}:') console = Console() async def query_all(): @@ -71,17 +70,21 @@ def ls( enable_modules=['piker.service._ahab'], ), ): - for backend in backends: - async with open_storage_client(name=backend) as ( - mod, - client, - ): - table.add_column(f'{mod.name} fqmes') - keys: list[str] = await client.list_keys() - for key in keys: - table.add_row(key) + for i, backend in enumerate(backends): + table = Table() + try: + async with open_storage_client(backend=backend) as ( + mod, + client, + ): + table.add_column(f'{mod.name}@{client.address}') + keys: list[str] = await client.list_keys() + for key in keys: + table.add_row(key) - console.print(table) + console.print(table) + except Exception: + log.error(f'Unable to connect to storage engine: `{backend}`') trio.run(query_all) @@ -194,80 +197,6 @@ def read( trio.run(main) - # if client_type == 'sync': - # import pymarketstore as pymkts - # cli = pymkts.Client() - - - # while end != 0: - # param = pymkts.Params( - # fqme, - # '1Min', - # 'OHLCV', - # limit=limit, - # # limit_from_start=True, - # end=end, - # ) - # if end is not None: - # breakpoint() - # reply = cli.query(param) - # ds: pymkts.results.DataSet = reply.first() - # array: np.ndarray = ds.array - - # print(f'loaded {len(array)}-len array:\n{array}') - - # times = array['Epoch'] - # end: float = float(times[0]) - # dt = pendulum.from_timestamp(end) - # # end: str = dt.isoformat('T') - # breakpoint() - # print( - # f'trying to load next {limit} datums frame starting @ {dt}' - # ) - # else: - # from anyio_marketstore import ( # noqa - # open_marketstore_client, - # MarketstoreClient, - # Params, - # ) - # async def main(): - - # end: int | None = None - - # async with open_marketstore_client( - # 'localhost', - # 5995, - # ) as client: - - # while end != 0: - # params = Params( - # symbols=fqme, - # # timeframe=tfstr, - # timeframe='1Min', - # attrgroup='OHLCV', - # end=end, - # # limit_from_start=True, - - # # TODO: figure the max limit here given the - # # ``purepc`` msg size limit of purerpc: 33554432 - # limit=limit, - # ) - - # if end is not None: - # breakpoint() - # result = await client.query(params) - # data_set = result.by_symbols()[fqme] - # array = data_set.array - # times = array['Epoch'] - # end: float = float(times[0]) - # dt = pendulum.from_timestamp(end) - # breakpoint() - # print( - # f'trying to load next {limit} datums frame starting @ {dt}' - # ) - - # trio.run(main) - @store.command() def clone( diff --git a/piker/storage/marketstore/__init__.py b/piker/storage/marketstore/__init__.py index 416ef7eb..2f0a7970 100644 --- a/piker/storage/marketstore/__init__.py +++ b/piker/storage/marketstore/__init__.py @@ -75,15 +75,22 @@ class MktsStorageClient: def __init__( self, client: MarketstoreClient, + config: dict, ) -> None: # TODO: eventually this should be an api/interface type that # ensures we can support multiple tsdb backends. self.client = client + self._config = config # series' cache from tsdb reads self._arrays: dict[str, np.ndarray] = {} + @property + def address(self) -> str: + conf = self._config + return f'grpc://{conf["host"]}:{conf["port"]}' + async def list_keys(self) -> list[str]: return await self.client.list_symbols() @@ -359,8 +366,8 @@ ohlc_key_map = bidict({ @acm async def get_client( - grpc_port: int, # required - host: str | None, + grpc_port: int = 5995, # required + host: str = 'localhost', ) -> MarketstoreClient: ''' @@ -372,4 +379,7 @@ async def get_client( host or 'localhost', grpc_port, ) as client: - yield MktsStorageClient(client) + yield MktsStorageClient( + client, + config={'host': host, 'port': grpc_port}, + )