diff --git a/piker/data/history.py b/piker/data/history.py index 182408f3..f8260c86 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -325,8 +325,19 @@ async def start_backfill( f'Writing {ln} frame to storage:\n' f'{start_dt} -> {end_dt}' ) + + if mkt.dst.atype != 'crypto': + # for now, our table key schema is not including + # the dst[/src] source asset token. + col_sym_key: str = mkt.get_fqme( + delim_char='', + without_src=True, + ) + else: + col_sym_key: str = mkt.get_fqme(delim_char='') + await storage.write_ohlcv( - f'{mkt.fqme}', + col_sym_key, to_push, timeframe, ) @@ -632,7 +643,7 @@ async def manage_history( name, uuid = uid service = name.rstrip(f'.{mod.name}') - fqme: str = mkt.fqme + fqme: str = mkt.get_fqme(delim_char='') # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. @@ -701,25 +712,28 @@ async def manage_history( ) assert open_history_client - conf, path = config.load('conf') - tsdbconf = conf['network'].get('tsdb') - - # lookup backend tsdb module by name and load any user service - # settings for connecting to the tsdb service. - tsdb_backend: str = tsdbconf.pop('backend') - tsdb_host: str = tsdbconf['host'] - - # TODO: import and load storagemod by name - # mod = get_storagemod(tsdb_backend) - from ..service import marketstore - tsdb_is_up: bool = False try_remote_tsdb: bool = False - if tsdb_host == 'localhost': - log.info('Scanning for existing `{tsbd_backend}`') - tsdb_is_up: bool = await check_for_service(f'{tsdb_backend}d') - else: - try_remote_tsdb: bool = True + + conf, path = config.load('conf', touch_if_dne=True) + net = conf.get('network') + if net: + tsdbconf = net.get('tsdb') + + # lookup backend tsdb module by name and load any user service + # settings for connecting to the tsdb service. + tsdb_backend: str = tsdbconf.pop('backend') + tsdb_host: str = tsdbconf['host'] + + # TODO: import and load storagemod by name + # mod = get_storagemod(tsdb_backend) + from ..service import marketstore + if tsdb_host == 'localhost': + log.info('Scanning for existing `{tsbd_backend}`') + tsdb_is_up: bool = await check_for_service(f'{tsdb_backend}d') + + else: + try_remote_tsdb: bool = True if ( tsdb_is_up