Pass and use `MktPair` throughout history routines

Previously we were passing the `fqme: str` which isn't as extensive nor
were we able to pass `MktPair` direct to backend history manager-loading
routines (which should be able to rely on always receiving it since
currently `stream_quotes()` is always called first for setup).

This also starts a slight bit of configuration oriented tsdb info
loading (via a new `conf.toml`) such that a user can decide to host
their (marketstore) db on a remote host and our container spawning and
client code will do the right startup automatically based on the config.
|-> Related to this I've added some comments about doing storage
backend module loading which should get actually written out as part of
patches coming in #486 (or something related).

Don't allow overruns again in history context since it seems it was
never a problem?
master
Tyler Goodlet 2023-05-17 10:19:14 -04:00
parent 5c8a45c64a
commit ae049eb84f
2 changed files with 70 additions and 35 deletions

View File

@ -333,7 +333,7 @@ async def allocate_persistent_feed(
manage_history, manage_history,
mod, mod,
bus, bus,
fqme, mkt,
some_data_ready, some_data_ready,
feed_is_live, feed_is_live,
) )
@ -378,7 +378,12 @@ async def allocate_persistent_feed(
# NOTE: if not configured otherwise, we always sum tick volume # NOTE: if not configured otherwise, we always sum tick volume
# values in the OHLCV sampler. # values in the OHLCV sampler.
sum_tick_vlm: bool = (init.shm_write_opts or {}).get('sum_tick_vlm', True) sum_tick_vlm: bool = True
if init.shm_write_opts:
sum_tick_vlm: bool = init.shm_write_opts.get(
'sum_tick_vlm',
True,
)
# NOTE: if no high-freq sampled data has (yet) been loaded, # NOTE: if no high-freq sampled data has (yet) been loaded,
# seed the buffer with a history datum - this is most handy # seed the buffer with a history datum - this is most handy
@ -525,7 +530,7 @@ async def open_feed_bus(
# NOTE we allow this since it's common to have the live # NOTE we allow this since it's common to have the live
# quote feed actor's sampling task push faster then the # quote feed actor's sampling task push faster then the
# the local UI-graphics code during startup. # the local UI-graphics code during startup.
allow_overruns=True, # allow_overruns=True,
) as stream, ) as stream,
): ):

View File

@ -38,6 +38,11 @@ import tractor
import pendulum import pendulum
import numpy as np import numpy as np
from .. import config
from ..accounting._mktinfo import (
MktPair,
unpack_fqme,
)
from ._util import ( from ._util import (
log, log,
) )
@ -84,7 +89,7 @@ def diff_history(
async def start_backfill( async def start_backfill(
mod: ModuleType, mod: ModuleType,
bfqsn: str, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
timeframe: float, timeframe: float,
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
@ -104,7 +109,11 @@ async def start_backfill(
tuple[np.ndarray, str] tuple[np.ndarray, str]
] ]
config: dict[str, int] config: dict[str, int]
async with mod.open_history_client(bfqsn) as (hist, config):
bs_fqme: str = mkt.bs_fqme
async with mod.open_history_client(
bs_fqme,
) as (hist, config):
# get latest query's worth of history all the way # get latest query's worth of history all the way
# back to what is recorded in the tsdb # back to what is recorded in the tsdb
@ -134,7 +143,7 @@ async def start_backfill(
surr = array[-6:] surr = array[-6:]
diff_in_mins = round(diff/60., ndigits=2) diff_in_mins = round(diff/60., ndigits=2)
log.warning( log.warning(
f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n' f'STEP ERROR `{bs_fqme}` for period {step_size_s}s:\n'
f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
'Surrounding 6 time stamps:\n' 'Surrounding 6 time stamps:\n'
f'{list(surr["time"])}\n' f'{list(surr["time"])}\n'
@ -161,7 +170,7 @@ async def start_backfill(
shm.push(to_push, prepend=True) shm.push(to_push, prepend=True)
# TODO: *** THIS IS A BUG *** # TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn.. # we need to only broadcast to subscribers for this fqme..
# otherwise all fsps get reset on every chart.. # otherwise all fsps get reset on every chart..
await sampler_stream.send('broadcast_all') await sampler_stream.send('broadcast_all')
@ -248,7 +257,7 @@ async def start_backfill(
): ):
start_dt = min(starts) start_dt = min(starts)
log.warning( log.warning(
f"{bfqsn}: skipping duplicate frame @ {next_start_dt}" f"{bs_fqme}: skipping duplicate frame @ {next_start_dt}"
) )
starts[start_dt] += 1 starts[start_dt] += 1
continue continue
@ -321,7 +330,7 @@ async def start_backfill(
f'{start_dt} -> {end_dt}' f'{start_dt} -> {end_dt}'
) )
await storage.write_ohlcv( await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul.. f'{mkt.fqme}',
to_push, to_push,
timeframe, timeframe,
) )
@ -342,7 +351,7 @@ async def start_backfill(
async def basic_backfill( async def basic_backfill(
bus: _FeedsBus, bus: _FeedsBus,
mod: ModuleType, mod: ModuleType,
bfqsn: str, mkt: MktPair,
shms: dict[int, ShmArray], shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -361,7 +370,7 @@ async def basic_backfill(
partial( partial(
start_backfill, start_backfill,
mod, mod,
bfqsn, mkt,
shm, shm,
timeframe, timeframe,
sampler_stream, sampler_stream,
@ -378,8 +387,7 @@ async def tsdb_backfill(
marketstore: ModuleType, marketstore: ModuleType,
bus: _FeedsBus, bus: _FeedsBus,
storage: Storage, storage: Storage,
fqsn: str, mkt: MktPair,
bfqsn: str,
shms: dict[int, ShmArray], shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -393,17 +401,17 @@ async def tsdb_backfill(
# TODO: this should be used verbatim for the pure # TODO: this should be used verbatim for the pure
# shm backfiller approach below. # shm backfiller approach below.
dts_per_tf: dict[int, datetime] = {} dts_per_tf: dict[int, datetime] = {}
fqme: str = mkt.fqme
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
for timeframe, shm in shms.items(): for timeframe, shm in shms.items():
# loads a (large) frame of data from the tsdb depending # loads a (large) frame of data from the tsdb depending
# on the db's query size limit. # on the db's query size limit.
tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load(
fqsn, fqme,
timeframe=timeframe, timeframe=timeframe,
) )
broker, *_ = unpack_fqme(fqsn)
try: try:
( (
latest_start_dt, latest_start_dt,
@ -413,7 +421,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
mod, mod,
bfqsn, mkt,
shm, shm,
timeframe, timeframe,
sampler_stream, sampler_stream,
@ -541,7 +549,7 @@ async def tsdb_backfill(
while shm._first.value > 0: while shm._first.value > 0:
tsdb_history = await storage.read_ohlcv( tsdb_history = await storage.read_ohlcv(
fqsn, fqme,
timeframe=timeframe, timeframe=timeframe,
end=tsdb_last_frame_start, end=tsdb_last_frame_start,
) )
@ -599,7 +607,7 @@ async def tsdb_backfill(
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
bus: _FeedsBus, bus: _FeedsBus,
fqsn: str, mkt: MktPair,
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
timeframe: float = 60, # in seconds timeframe: float = 60, # in seconds
@ -628,11 +636,12 @@ async def manage_history(
name, uuid = uid name, uuid = uid
service = name.rstrip(f'.{mod.name}') service = name.rstrip(f'.{mod.name}')
fqme: str = mkt.fqme
# (maybe) allocate shm array for this broker/symbol which will # (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing. # be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array( hist_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_hist_p{port}', key=f'piker.{service}[{uuid[:16]}.{fqme}.hist',
key=f'piker.{service}[{uuid[:16]}.{fqsn}.hist',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -649,9 +658,7 @@ async def manage_history(
) )
rt_shm, opened = maybe_open_shm_array( rt_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_rt_p{port}', key=f'piker.{service}[{uuid[:16]}.{fqme}.rt',
# key=f'piker.{service}.{fqsn}_rt.{uuid}',
key=f'piker.{service}[{uuid[:16]}.{fqsn}.rt',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -691,23 +698,47 @@ async def manage_history(
) as sample_stream: ) as sample_stream:
log.info('Scanning for existing `marketstored`') open_history_client = getattr(
tsdb_is_up = await check_for_service('marketstored') mod,
'open_history_client',
bfqsn = fqsn.replace('.' + mod.name, '') None,
open_history_client = getattr(mod, 'open_history_client', None) )
assert open_history_client assert open_history_client
conf, path = config.load('conf')
tsdbconf = conf['network'].get('tsdb')
# lookup backend tsdb module by name and load any user service
# settings for connecting to the tsdb service.
tsdb_backend: str = tsdbconf.pop('backend')
tsdb_host: str = tsdbconf['host']
# TODO: import and load storagemod by name
# mod = get_storagemod(tsdb_backend)
from ..service import marketstore
tsdb_is_up: bool = False
try_remote_tsdb: bool = False
if tsdb_host == 'localhost':
log.info('Scanning for existing `{tsbd_backend}`')
tsdb_is_up: bool = await check_for_service(f'{tsdb_backend}d')
else:
try_remote_tsdb: bool = True
if ( if (
tsdb_is_up tsdb_is_up
and opened or try_remote_tsdb
and open_history_client and (
opened
and open_history_client
)
): ):
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
from ..service import marketstore
async with ( async with (
marketstore.open_storage_client(fqsn)as storage, marketstore.open_storage_client(
**tsdbconf
) as storage,
): ):
# TODO: drop returning the output that we pass in? # TODO: drop returning the output that we pass in?
await bus.nursery.start( await bus.nursery.start(
@ -716,8 +747,7 @@ async def manage_history(
marketstore, marketstore,
bus, bus,
storage, storage,
fqsn, mkt,
bfqsn,
{ {
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,
@ -752,7 +782,7 @@ async def manage_history(
await basic_backfill( await basic_backfill(
bus, bus,
mod, mod,
bfqsn, mkt,
{ {
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,