Clean out marketstore specifics

- drop buncha cruft from `store ls` cmd and make it work for
  multi-backend fqme listing.
  - including adding an `.address` to the mkts client which shows the
    grpc socketaddr details.
- change defauls to new `'nativedb'.
- drop 'marketstore' from built-in backend list (for now)
basic_buy_bot
Tyler Goodlet 2023-06-02 13:20:28 -04:00
parent c52e889fe5
commit c020ab76be
3 changed files with 52 additions and 110 deletions
piker/storage

View File

@ -62,7 +62,8 @@ get_console_log = partial(
__tsdbs__: list[str] = [ __tsdbs__: list[str] = [
'marketstore', 'nativedb',
# 'marketstore',
] ]
@ -157,13 +158,14 @@ def get_storagemod(name: str) -> ModuleType:
@acm @acm
async def open_storage_client( async def open_storage_client(
name: str = 'nativedb', backend: str | None = None,
) -> tuple[ModuleType, StorageClient]: ) -> tuple[ModuleType, StorageClient]:
''' '''
Load the ``StorageClient`` for named backend. Load the ``StorageClient`` for named backend.
''' '''
def_backend: str = 'nativedb'
tsdb_host: str = 'localhost' tsdb_host: str = 'localhost'
# load root config and any tsdb user defined settings # load root config and any tsdb user defined settings
@ -171,44 +173,45 @@ async def open_storage_client(
# TODO: maybe not under a "network" section.. since # TODO: maybe not under a "network" section.. since
# no more chitty mkts.. # no more chitty mkts..
net = conf.get('network') tsdbconf: dict = {}
if net: service_section = conf.get('service')
tsdbconf = net.get('tsdb') if (
not backend
and service_section
):
tsdbconf = service_section.get('tsdb')
# lookup backend tsdb module by name and load any user service # lookup backend tsdb module by name and load any user service
# settings for connecting to the tsdb service. # settings for connecting to the tsdb service.
name: str = tsdbconf.pop('backend') backend: str = tsdbconf.pop('backend')
tsdb_host: str = tsdbconf['host'] tsdb_host: str = tsdbconf['host']
if name is None: if backend is None:
raise RuntimeError('No tsdb backend has been set!?') backend: str = def_backend
# import and load storagemod by name # import and load storagemod by name
mod: ModuleType = get_storagemod(name) mod: ModuleType = get_storagemod(backend)
get_client = mod.get_client get_client = mod.get_client
log.info('Scanning for existing `{tsbd_backend}`') log.info('Scanning for existing `{tsbd_backend}`')
tsdb_is_up: bool = await check_for_service(f'{name}d') if backend != def_backend:
tsdb_is_up: bool = await check_for_service(f'{backend}d')
if ( if (
tsdb_host == 'localhost' tsdb_host == 'localhost'
or tsdb_is_up or tsdb_is_up
): ):
log.info(f'Connecting to local {name}@{tsdbconf}') log.info(f'Connecting to local: {backend}@{tsdbconf}')
else: else:
log.info(f'Attempting to connect to remote {name}@{tsdbconf}') log.info(f'Attempting to connect to remote: {backend}@{tsdbconf}')
else:
log.info(f'Connecting to default storage: {backend}@{tsdbconf}')
# try:
async with ( async with (
get_client(**tsdbconf) as client, get_client(**tsdbconf) as client,
): ):
# slap on our wrapper api # slap on our wrapper api
yield mod, client yield mod, client
# except Exception as err:
# raise StorageConnectionError(
# f'No connection to {name}'
# ) from err
# NOTE: pretty sure right now this is only being # NOTE: pretty sure right now this is only being
# called by a CLI entrypoint? # called by a CLI entrypoint?

View File

@ -59,7 +59,6 @@ def ls(
if not backends: if not backends:
backends: list[str] = __tsdbs__ backends: list[str] = __tsdbs__
table = Table(title=f'Table keys for backends {backends}:')
console = Console() console = Console()
async def query_all(): async def query_all():
@ -71,17 +70,21 @@ def ls(
enable_modules=['piker.service._ahab'], enable_modules=['piker.service._ahab'],
), ),
): ):
for backend in backends: for i, backend in enumerate(backends):
async with open_storage_client(name=backend) as ( table = Table()
try:
async with open_storage_client(backend=backend) as (
mod, mod,
client, client,
): ):
table.add_column(f'{mod.name} fqmes') table.add_column(f'{mod.name}@{client.address}')
keys: list[str] = await client.list_keys() keys: list[str] = await client.list_keys()
for key in keys: for key in keys:
table.add_row(key) table.add_row(key)
console.print(table) console.print(table)
except Exception:
log.error(f'Unable to connect to storage engine: `{backend}`')
trio.run(query_all) trio.run(query_all)
@ -194,80 +197,6 @@ def read(
trio.run(main) trio.run(main)
# if client_type == 'sync':
# import pymarketstore as pymkts
# cli = pymkts.Client()
# while end != 0:
# param = pymkts.Params(
# fqme,
# '1Min',
# 'OHLCV',
# limit=limit,
# # limit_from_start=True,
# end=end,
# )
# if end is not None:
# breakpoint()
# reply = cli.query(param)
# ds: pymkts.results.DataSet = reply.first()
# array: np.ndarray = ds.array
# print(f'loaded {len(array)}-len array:\n{array}')
# times = array['Epoch']
# end: float = float(times[0])
# dt = pendulum.from_timestamp(end)
# # end: str = dt.isoformat('T')
# breakpoint()
# print(
# f'trying to load next {limit} datums frame starting @ {dt}'
# )
# else:
# from anyio_marketstore import ( # noqa
# open_marketstore_client,
# MarketstoreClient,
# Params,
# )
# async def main():
# end: int | None = None
# async with open_marketstore_client(
# 'localhost',
# 5995,
# ) as client:
# while end != 0:
# params = Params(
# symbols=fqme,
# # timeframe=tfstr,
# timeframe='1Min',
# attrgroup='OHLCV',
# end=end,
# # limit_from_start=True,
# # TODO: figure the max limit here given the
# # ``purepc`` msg size limit of purerpc: 33554432
# limit=limit,
# )
# if end is not None:
# breakpoint()
# result = await client.query(params)
# data_set = result.by_symbols()[fqme]
# array = data_set.array
# times = array['Epoch']
# end: float = float(times[0])
# dt = pendulum.from_timestamp(end)
# breakpoint()
# print(
# f'trying to load next {limit} datums frame starting @ {dt}'
# )
# trio.run(main)
@store.command() @store.command()
def clone( def clone(

View File

@ -75,15 +75,22 @@ class MktsStorageClient:
def __init__( def __init__(
self, self,
client: MarketstoreClient, client: MarketstoreClient,
config: dict,
) -> None: ) -> None:
# TODO: eventually this should be an api/interface type that # TODO: eventually this should be an api/interface type that
# ensures we can support multiple tsdb backends. # ensures we can support multiple tsdb backends.
self.client = client self.client = client
self._config = config
# series' cache from tsdb reads # series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {} self._arrays: dict[str, np.ndarray] = {}
@property
def address(self) -> str:
conf = self._config
return f'grpc://{conf["host"]}:{conf["port"]}'
async def list_keys(self) -> list[str]: async def list_keys(self) -> list[str]:
return await self.client.list_symbols() return await self.client.list_symbols()
@ -359,8 +366,8 @@ ohlc_key_map = bidict({
@acm @acm
async def get_client( async def get_client(
grpc_port: int, # required grpc_port: int = 5995, # required
host: str | None, host: str = 'localhost',
) -> MarketstoreClient: ) -> MarketstoreClient:
''' '''
@ -372,4 +379,7 @@ async def get_client(
host or 'localhost', host or 'localhost',
grpc_port, grpc_port,
) as client: ) as client:
yield MktsStorageClient(client) yield MktsStorageClient(
client,
config={'host': host, 'port': grpc_port},
)