From 9d04accf2e9e9e1a42837ff62de2fa8ed23f12ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 20 Apr 2023 13:36:52 -0400 Subject: [PATCH] Factor out all history mgmt-logic into a new `.data.history` --- piker/data/feed.py | 738 +--------------------------------------- piker/data/history.py | 770 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 780 insertions(+), 728 deletions(-) create mode 100644 piker/data/history.py diff --git a/piker/data/feed.py b/piker/data/feed.py index 51f1275b..02f0adec 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -14,31 +14,31 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Data feed apis and infra. -This module is enabled for ``brokerd`` daemons. +This module is enabled for ``brokerd`` daemons and includes mostly +endpoints and middleware to support our real-time, provider agnostic, +live market quotes layer. Historical data loading and processing is also +initiated in parts of the feed bus startup but business logic and +functionality is generally located in the sibling `.data.history` +module. -""" +''' from __future__ import annotations from collections import ( defaultdict, - Counter, ) from contextlib import asynccontextmanager as acm -# from decimal import Decimal -from datetime import datetime from functools import partial import time from types import ModuleType from typing import ( Any, AsyncContextManager, - Callable, Optional, Awaitable, Sequence, - TYPE_CHECKING, Union, ) @@ -50,8 +50,6 @@ from tractor.trionics import ( maybe_open_context, gather_contexts, ) -import pendulum -import numpy as np from ..brokers import get_brokermod from ..calc import humanize @@ -61,17 +59,14 @@ from ._util import ( ) from ..service import ( maybe_spawn_brokerd, - check_for_service, ) from .flows import Flume from .validate import ( FeedInit, validate_backend, ) -from ._sharedmem import ( - maybe_open_shm_array, - ShmArray, - _secs_in_day, +from .history import ( + manage_history, ) from .ingest import get_ingestormod from .types import Struct @@ -79,19 +74,11 @@ from ..accounting._mktinfo import ( MktPair, unpack_fqme, ) -from ._source import base_iohlc_dtype from ..ui import _search from ._sampling import ( - open_sample_stream, sample_and_broadcast, uniform_rate_send, ) -from ..brokers._util import ( - DataUnavailable, -) - -if TYPE_CHECKING: - from ..service.marketstore import Storage class _FeedsBus(Struct): @@ -230,711 +217,6 @@ def get_feed_bus( return _bus -def diff_history( - array: np.ndarray, - timeframe: int, - start_dt: datetime, - end_dt: datetime, - last_tsdb_dt: datetime | None = None - -) -> np.ndarray: - - # no diffing with tsdb dt index possible.. - if last_tsdb_dt is None: - return array - - time = array['time'] - return array[time > last_tsdb_dt.timestamp()] - - -async def start_backfill( - mod: ModuleType, - bfqsn: str, - shm: ShmArray, - timeframe: float, - sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, - - last_tsdb_dt: Optional[datetime] = None, - storage: Optional[Storage] = None, - write_tsdb: bool = True, - tsdb_is_up: bool = False, - - task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, - -) -> int: - - hist: Callable[ - [int, datetime, datetime], - tuple[np.ndarray, str] - ] - config: dict[str, int] - async with mod.open_history_client(bfqsn) as (hist, config): - - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - array, start_dt, end_dt = await hist( - timeframe, - end_dt=None, - ) - times = array['time'] - - # sample period step size in seconds - step_size_s = ( - pendulum.from_timestamp(times[-1]) - - pendulum.from_timestamp(times[-2]) - ).seconds - - # if the market is open (aka we have a live feed) but the - # history sample step index seems off we report the surrounding - # data and drop into a bp. this case shouldn't really ever - # happen if we're doing history retrieval correctly. - if ( - step_size_s == 60 - and feed_is_live.is_set() - ): - inow = round(time.time()) - diff = inow - times[-1] - if abs(diff) > 60: - surr = array[-6:] - diff_in_mins = round(diff/60., ndigits=2) - log.warning( - f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n' - f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' - 'Surrounding 6 time stamps:\n' - f'{list(surr["time"])}\n' - 'Here is surrounding 6 samples:\n' - f'{surr}\nn' - ) - - # uncomment this for a hacker who wants to investigate - # this case manually.. - # await tractor.breakpoint() - - # frame's worth of sample-period-steps, in seconds - frame_size_s = len(array) * step_size_s - - to_push = diff_history( - array, - timeframe, - start_dt, - end_dt, - last_tsdb_dt=last_tsdb_dt, - ) - - log.info(f'Pushing {to_push.size} to shm!') - shm.push(to_push, prepend=True) - - # TODO: *** THIS IS A BUG *** - # we need to only broadcast to subscribers for this fqsn.. - # otherwise all fsps get reset on every chart.. - await sampler_stream.send('broadcast_all') - - # signal that backfilling to tsdb's end datum is complete - bf_done = trio.Event() - - # let caller unblock and deliver latest history frame - task_status.started(( - start_dt, - end_dt, - bf_done, - )) - - # based on the sample step size, maybe load a certain amount history - if last_tsdb_dt is None: - if step_size_s not in (1, 60): - raise ValueError( - '`piker` only needs to support 1m and 1s sampling ' - 'but ur api is trying to deliver a longer ' - f'timeframe of {step_size_s} seconds..\n' - 'So yuh.. dun do dat brudder.' - ) - - # when no tsdb "last datum" is provided, we just load - # some near-term history. - periods = { - 1: {'days': 1}, - 60: {'days': 14}, - } - - if tsdb_is_up: - # do a decently sized backfill and load it into storage. - periods = { - 1: {'days': 6}, - 60: {'years': 6}, - } - - period_duration = periods[step_size_s] - - # NOTE: manually set the "latest" datetime which we intend to - # backfill history "until" so as to adhere to the history - # settings above when the tsdb is detected as being empty. - last_tsdb_dt = start_dt.subtract(**period_duration) - - # configure async query throttling - # rate = config.get('rate', 1) - # XXX: legacy from ``trimeter`` code but unsupported now. - # erlangs = config.get('erlangs', 1) - - # avoid duplicate history frames with a set of datetime frame - # starts and associated counts of how many duplicates we see - # per time stamp. - starts: Counter[datetime] = Counter() - - # inline sequential loop where we simply pass the - # last retrieved start dt to the next request as - # it's end dt. - while end_dt > last_tsdb_dt: - log.debug( - f'Requesting {step_size_s}s frame ending in {start_dt}' - ) - - try: - array, next_start_dt, end_dt = await hist( - timeframe, - end_dt=start_dt, - ) - - # broker says there never was or is no more history to pull - except DataUnavailable: - log.warning( - f'NO-MORE-DATA: backend {mod.name} halted history!?' - ) - - # ugh, what's a better way? - # TODO: fwiw, we probably want a way to signal a throttle - # condition (eg. with ib) so that we can halt the - # request loop until the condition is resolved? - return - - if ( - next_start_dt in starts - and starts[next_start_dt] <= 6 - ): - start_dt = min(starts) - log.warning( - f"{bfqsn}: skipping duplicate frame @ {next_start_dt}" - ) - starts[start_dt] += 1 - continue - - elif starts[next_start_dt] > 6: - log.warning( - f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?' - ) - return - - # only update new start point if not-yet-seen - start_dt = next_start_dt - starts[start_dt] += 1 - - assert array['time'][0] == start_dt.timestamp() - - diff = end_dt - start_dt - frame_time_diff_s = diff.seconds - expected_frame_size_s = frame_size_s + step_size_s - - if frame_time_diff_s > expected_frame_size_s: - - # XXX: query result includes a start point prior to our - # expected "frame size" and thus is likely some kind of - # history gap (eg. market closed period, outage, etc.) - # so just report it to console for now. - log.warning( - f'History frame ending @ {end_dt} appears to have a gap:\n' - f'{diff} ~= {frame_time_diff_s} seconds' - ) - - to_push = diff_history( - array, - timeframe, - start_dt, - end_dt, - last_tsdb_dt=last_tsdb_dt, - ) - ln = len(to_push) - if ln: - log.info(f'{ln} bars for {start_dt} -> {end_dt}') - - else: - log.warning( - f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' - ) - - # bail gracefully on shm allocation overrun/full condition - try: - shm.push(to_push, prepend=True) - except ValueError: - log.info( - f'Shm buffer overrun on: {start_dt} -> {end_dt}?' - ) - # can't push the entire frame? so - # push only the amount that can fit.. - break - - log.info( - f'Shm pushed {ln} frame:\n' - f'{start_dt} -> {end_dt}' - ) - - if ( - storage is not None - and write_tsdb - ): - log.info( - f'Writing {ln} frame to storage:\n' - f'{start_dt} -> {end_dt}' - ) - await storage.write_ohlcv( - f'{bfqsn}.{mod.name}', # lul.. - to_push, - timeframe, - ) - - # TODO: can we only trigger this if the respective - # history in "in view"?!? - - # XXX: extremely important, there can be no checkpoints - # in the block above to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - await sampler_stream.send('broadcast_all') - - # short-circuit (for now) - bf_done.set() - - -async def basic_backfill( - bus: _FeedsBus, - mod: ModuleType, - bfqsn: str, - shms: dict[int, ShmArray], - sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, - -) -> None: - - # do a legacy incremental backfill from the provider. - log.info('No TSDB (marketstored) found, doing basic backfill..') - - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - for timeframe, shm in shms.items(): - try: - await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - shm, - timeframe, - sampler_stream, - feed_is_live, - ) - ) - except DataUnavailable: - # XXX: timeframe not supported for backend - continue - - -async def tsdb_backfill( - mod: ModuleType, - marketstore: ModuleType, - bus: _FeedsBus, - storage: Storage, - fqsn: str, - bfqsn: str, - shms: dict[int, ShmArray], - sampler_stream: tractor.MsgStream, - feed_is_live: trio.Event, - - task_status: TaskStatus[ - tuple[ShmArray, ShmArray] - ] = trio.TASK_STATUS_IGNORED, - -) -> None: - - # TODO: this should be used verbatim for the pure - # shm backfiller approach below. - dts_per_tf: dict[int, datetime] = {} - - # start history anal and load missing new data via backend. - for timeframe, shm in shms.items(): - # loads a (large) frame of data from the tsdb depending - # on the db's query size limit. - tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( - fqsn, - timeframe=timeframe, - ) - - broker, *_ = unpack_fqme(fqsn) - try: - ( - latest_start_dt, - latest_end_dt, - bf_done, - ) = await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - shm, - timeframe, - sampler_stream, - feed_is_live, - - last_tsdb_dt=last_tsdb_dt, - tsdb_is_up=True, - storage=storage, - ) - ) - except DataUnavailable: - # XXX: timeframe not supported for backend - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - None, - None, - None, - ) - continue - - # tsdb_history = series.get(timeframe) - dts_per_tf[timeframe] = ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) - - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. - - # unblock the feed bus management task - # assert len(shms[1].array) - task_status.started() - - async def back_load_from_tsdb( - timeframe: int, - shm: ShmArray, - ): - ( - tsdb_history, - last_tsdb_dt, - latest_start_dt, - latest_end_dt, - bf_done, - ) = dts_per_tf[timeframe] - - # sync to backend history task's query/load completion - if bf_done: - await bf_done.wait() - - # TODO: eventually it'd be nice to not require a shm array/buffer - # to accomplish this.. maybe we can do some kind of tsdb direct to - # graphics format eventually in a child-actor? - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = shm._first.value - array = shm.array - if len(array): - shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) - else: - shm_last_dt = None - - if last_tsdb_dt: - assert shm_last_dt >= last_tsdb_dt - - # do diff against start index of last frame of history and only - # fill in an amount of datums from tsdb allows for most recent - # to be loaded into mem *before* tsdb data. - if ( - last_tsdb_dt - and latest_start_dt - ): - backfilled_size_s = ( - latest_start_dt - last_tsdb_dt - ).seconds - # if the shm buffer len is not large enough to contain - # all missing data between the most recent backend-queried frame - # and the most recent dt-index in the db we warn that we only - # want to load a portion of the next tsdb query to fill that - # space. - log.info( - f'{backfilled_size_s} seconds worth of {timeframe}s loaded' - ) - - # Load TSDB history into shm buffer (for display) if there is - # remaining buffer space. - - if ( - len(tsdb_history) - ): - # load the first (smaller) bit of history originally loaded - # above from ``Storage.load()``. - to_push = tsdb_history[-prepend_start:] - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=marketstore.ohlc_key_map, - ) - - tsdb_last_frame_start = tsdb_history['Epoch'][0] - - if timeframe == 1: - times = shm.array['time'] - assert (times[1] - times[0]) == 1 - - # load as much from storage into shm possible (depends on - # user's shm size settings). - while shm._first.value > 0: - - tsdb_history = await storage.read_ohlcv( - fqsn, - timeframe=timeframe, - end=tsdb_last_frame_start, - ) - - # empty query - if not len(tsdb_history): - break - - next_start = tsdb_history['Epoch'][0] - if next_start >= tsdb_last_frame_start: - # no earlier data detected - break - else: - tsdb_last_frame_start = next_start - - prepend_start = shm._first.value - to_push = tsdb_history[-prepend_start:] - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - shm.push( - to_push, - prepend=True, - field_map=marketstore.ohlc_key_map, - ) - log.info(f'Loaded {to_push.shape} datums from storage') - - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - await sampler_stream.send('broadcast_all') - - # TODO: write new data to tsdb to be ready to for next read. - - # backload from db (concurrently per timeframe) once backfilling of - # recent dat a loaded from the backend provider (see - # ``bf_done.wait()`` call). - async with trio.open_nursery() as nurse: - for timeframe, shm in shms.items(): - nurse.start_soon( - back_load_from_tsdb, - timeframe, - shm, - ) - - -async def manage_history( - mod: ModuleType, - bus: _FeedsBus, - fqsn: str, - some_data_ready: trio.Event, - feed_is_live: trio.Event, - timeframe: float = 60, # in seconds - - task_status: TaskStatus[ - tuple[ShmArray, ShmArray] - ] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Load and manage historical data including the loading of any - available series from `marketstore` as well as conducting real-time - update of both that existing db and the allocated shared memory - buffer. - - ''' - - # TODO: is there a way to make each shm file key - # actor-tree-discovery-addr unique so we avoid collisions - # when doing tests which also allocate shms for certain instruments - # that may be in use on the system by some other running daemons? - # from tractor._state import _runtime_vars - # port = _runtime_vars['_root_mailbox'][1] - - uid = tractor.current_actor().uid - name, uuid = uid - service = name.rstrip(f'.{mod.name}') - - # (maybe) allocate shm array for this broker/symbol which will - # be used for fast near-term history capture and processing. - hist_shm, opened = maybe_open_shm_array( - # key=f'{fqsn}_hist_p{port}', - key=f'piker.{service}[{uuid[:16]}.{fqsn}.hist', - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=False, - ) - hist_zero_index = hist_shm.index - 1 - - # TODO: history validation - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) - - rt_shm, opened = maybe_open_shm_array( - # key=f'{fqsn}_rt_p{port}', - # key=f'piker.{service}.{fqsn}_rt.{uuid}', - key=f'piker.{service}[{uuid[:16]}.{fqsn}.rt', - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=False, - size=3*_secs_in_day, - ) - - # (for now) set the rt (hft) shm array with space to prepend - # only a few days worth of 1s history. - days = 2 - start_index = days*_secs_in_day - rt_shm._first.value = start_index - rt_shm._last.value = start_index - rt_zero_index = rt_shm.index - 1 - - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) - - # register 1s and 1m buffers with the global incrementer task - async with open_sample_stream( - period_s=1., - shms_by_period={ - 1.: rt_shm.token, - 60.: hist_shm.token, - }, - - # NOTE: we want to only open a stream for doing broadcasts on - # backfill operations, not receive the sample index-stream - # (since there's no code in this data feed layer that needs to - # consume it). - open_index_stream=True, - sub_for_broadcasts=False, - - ) as sample_stream: - - log.info('Scanning for existing `marketstored`') - tsdb_is_up = await check_for_service('marketstored') - - bfqsn = fqsn.replace('.' + mod.name, '') - open_history_client = getattr(mod, 'open_history_client', None) - assert open_history_client - - if ( - tsdb_is_up - and opened - and open_history_client - ): - log.info('Found existing `marketstored`') - - from ..service import marketstore - async with ( - marketstore.open_storage_client(fqsn)as storage, - ): - # TODO: drop returning the output that we pass in? - await bus.nursery.start( - tsdb_backfill, - mod, - marketstore, - bus, - storage, - fqsn, - bfqsn, - { - 1: rt_shm, - 60: hist_shm, - }, - sample_stream, - feed_is_live, - ) - - # yield back after client connect with filled shm - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # history retreival loop depending on user interaction - # and thus a small RPC-prot for remotely controllinlg - # what data is loaded for viewing. - await trio.sleep_forever() - - # load less history if no tsdb can be found - elif ( - not tsdb_is_up - and opened - ): - await basic_backfill( - bus, - mod, - bfqsn, - { - 1: rt_shm, - 60: hist_shm, - }, - sample_stream, - feed_is_live, - ) - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) - some_data_ready.set() - await trio.sleep_forever() - - async def allocate_persistent_feed( bus: _FeedsBus, sub_registered: trio.Event, diff --git a/piker/data/history.py b/piker/data/history.py new file mode 100644 index 00000000..3e0a3a62 --- /dev/null +++ b/piker/data/history.py @@ -0,0 +1,770 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Historical data business logic for load, backfill and tsdb storage. + +''' +from __future__ import annotations +from collections import ( + Counter, +) +from datetime import datetime +from functools import partial +import time +from types import ModuleType +from typing import ( + Callable, + Optional, + TYPE_CHECKING, +) + +import trio +from trio_typing import TaskStatus +import tractor +import pendulum +import numpy as np + +from ._util import ( + log, +) +from ..service import ( + check_for_service, +) +from ._sharedmem import ( + maybe_open_shm_array, + ShmArray, + _secs_in_day, +) +from ..accounting._mktinfo import ( + unpack_fqme, +) +from ._source import base_iohlc_dtype +from ._sampling import ( + open_sample_stream, +) +from ..brokers._util import ( + DataUnavailable, +) + +if TYPE_CHECKING: + from ..service.marketstore import Storage + from .feed import _FeedsBus + + +def diff_history( + array: np.ndarray, + timeframe: int, + start_dt: datetime, + end_dt: datetime, + last_tsdb_dt: datetime | None = None + +) -> np.ndarray: + + # no diffing with tsdb dt index possible.. + if last_tsdb_dt is None: + return array + + time = array['time'] + return array[time > last_tsdb_dt.timestamp()] + + +async def start_backfill( + mod: ModuleType, + bfqsn: str, + shm: ShmArray, + timeframe: float, + sampler_stream: tractor.MsgStream, + feed_is_live: trio.Event, + + last_tsdb_dt: Optional[datetime] = None, + storage: Optional[Storage] = None, + write_tsdb: bool = True, + tsdb_is_up: bool = False, + + task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, + +) -> int: + + hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ] + config: dict[str, int] + async with mod.open_history_client(bfqsn) as (hist, config): + + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + array, start_dt, end_dt = await hist( + timeframe, + end_dt=None, + ) + times = array['time'] + + # sample period step size in seconds + step_size_s = ( + pendulum.from_timestamp(times[-1]) + - pendulum.from_timestamp(times[-2]) + ).seconds + + # if the market is open (aka we have a live feed) but the + # history sample step index seems off we report the surrounding + # data and drop into a bp. this case shouldn't really ever + # happen if we're doing history retrieval correctly. + if ( + step_size_s == 60 + and feed_is_live.is_set() + ): + inow = round(time.time()) + diff = inow - times[-1] + if abs(diff) > 60: + surr = array[-6:] + diff_in_mins = round(diff/60., ndigits=2) + log.warning( + f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n' + f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' + 'Surrounding 6 time stamps:\n' + f'{list(surr["time"])}\n' + 'Here is surrounding 6 samples:\n' + f'{surr}\nn' + ) + + # uncomment this for a hacker who wants to investigate + # this case manually.. + # await tractor.breakpoint() + + # frame's worth of sample-period-steps, in seconds + frame_size_s = len(array) * step_size_s + + to_push = diff_history( + array, + timeframe, + start_dt, + end_dt, + last_tsdb_dt=last_tsdb_dt, + ) + + log.info(f'Pushing {to_push.size} to shm!') + shm.push(to_push, prepend=True) + + # TODO: *** THIS IS A BUG *** + # we need to only broadcast to subscribers for this fqsn.. + # otherwise all fsps get reset on every chart.. + await sampler_stream.send('broadcast_all') + + # signal that backfilling to tsdb's end datum is complete + bf_done = trio.Event() + + # let caller unblock and deliver latest history frame + task_status.started(( + start_dt, + end_dt, + bf_done, + )) + + # based on the sample step size, maybe load a certain amount history + if last_tsdb_dt is None: + if step_size_s not in (1, 60): + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {step_size_s} seconds..\n' + 'So yuh.. dun do dat brudder.' + ) + + # when no tsdb "last datum" is provided, we just load + # some near-term history. + periods = { + 1: {'days': 1}, + 60: {'days': 14}, + } + + if tsdb_is_up: + # do a decently sized backfill and load it into storage. + periods = { + 1: {'days': 6}, + 60: {'years': 6}, + } + + period_duration = periods[step_size_s] + + # NOTE: manually set the "latest" datetime which we intend to + # backfill history "until" so as to adhere to the history + # settings above when the tsdb is detected as being empty. + last_tsdb_dt = start_dt.subtract(**period_duration) + + # configure async query throttling + # rate = config.get('rate', 1) + # XXX: legacy from ``trimeter`` code but unsupported now. + # erlangs = config.get('erlangs', 1) + + # avoid duplicate history frames with a set of datetime frame + # starts and associated counts of how many duplicates we see + # per time stamp. + starts: Counter[datetime] = Counter() + + # inline sequential loop where we simply pass the + # last retrieved start dt to the next request as + # it's end dt. + while end_dt > last_tsdb_dt: + log.debug( + f'Requesting {step_size_s}s frame ending in {start_dt}' + ) + + try: + array, next_start_dt, end_dt = await hist( + timeframe, + end_dt=start_dt, + ) + + # broker says there never was or is no more history to pull + except DataUnavailable: + log.warning( + f'NO-MORE-DATA: backend {mod.name} halted history!?' + ) + + # ugh, what's a better way? + # TODO: fwiw, we probably want a way to signal a throttle + # condition (eg. with ib) so that we can halt the + # request loop until the condition is resolved? + return + + if ( + next_start_dt in starts + and starts[next_start_dt] <= 6 + ): + start_dt = min(starts) + log.warning( + f"{bfqsn}: skipping duplicate frame @ {next_start_dt}" + ) + starts[start_dt] += 1 + continue + + elif starts[next_start_dt] > 6: + log.warning( + f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?' + ) + return + + # only update new start point if not-yet-seen + start_dt = next_start_dt + starts[start_dt] += 1 + + assert array['time'][0] == start_dt.timestamp() + + diff = end_dt - start_dt + frame_time_diff_s = diff.seconds + expected_frame_size_s = frame_size_s + step_size_s + + if frame_time_diff_s > expected_frame_size_s: + + # XXX: query result includes a start point prior to our + # expected "frame size" and thus is likely some kind of + # history gap (eg. market closed period, outage, etc.) + # so just report it to console for now. + log.warning( + f'History frame ending @ {end_dt} appears to have a gap:\n' + f'{diff} ~= {frame_time_diff_s} seconds' + ) + + to_push = diff_history( + array, + timeframe, + start_dt, + end_dt, + last_tsdb_dt=last_tsdb_dt, + ) + ln = len(to_push) + if ln: + log.info(f'{ln} bars for {start_dt} -> {end_dt}') + + else: + log.warning( + f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' + ) + + # bail gracefully on shm allocation overrun/full condition + try: + shm.push(to_push, prepend=True) + except ValueError: + log.info( + f'Shm buffer overrun on: {start_dt} -> {end_dt}?' + ) + # can't push the entire frame? so + # push only the amount that can fit.. + break + + log.info( + f'Shm pushed {ln} frame:\n' + f'{start_dt} -> {end_dt}' + ) + + if ( + storage is not None + and write_tsdb + ): + log.info( + f'Writing {ln} frame to storage:\n' + f'{start_dt} -> {end_dt}' + ) + await storage.write_ohlcv( + f'{bfqsn}.{mod.name}', # lul.. + to_push, + timeframe, + ) + + # TODO: can we only trigger this if the respective + # history in "in view"?!? + + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + await sampler_stream.send('broadcast_all') + + # short-circuit (for now) + bf_done.set() + + +async def basic_backfill( + bus: _FeedsBus, + mod: ModuleType, + bfqsn: str, + shms: dict[int, ShmArray], + sampler_stream: tractor.MsgStream, + feed_is_live: trio.Event, + +) -> None: + + # do a legacy incremental backfill from the provider. + log.info('No TSDB (marketstored) found, doing basic backfill..') + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + for timeframe, shm in shms.items(): + try: + await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe, + sampler_stream, + feed_is_live, + ) + ) + except DataUnavailable: + # XXX: timeframe not supported for backend + continue + + +async def tsdb_backfill( + mod: ModuleType, + marketstore: ModuleType, + bus: _FeedsBus, + storage: Storage, + fqsn: str, + bfqsn: str, + shms: dict[int, ShmArray], + sampler_stream: tractor.MsgStream, + feed_is_live: trio.Event, + + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + + # TODO: this should be used verbatim for the pure + # shm backfiller approach below. + dts_per_tf: dict[int, datetime] = {} + + # start history anal and load missing new data via backend. + for timeframe, shm in shms.items(): + # loads a (large) frame of data from the tsdb depending + # on the db's query size limit. + tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( + fqsn, + timeframe=timeframe, + ) + + broker, *_ = unpack_fqme(fqsn) + try: + ( + latest_start_dt, + latest_end_dt, + bf_done, + ) = await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe, + sampler_stream, + feed_is_live, + + last_tsdb_dt=last_tsdb_dt, + tsdb_is_up=True, + storage=storage, + ) + ) + except DataUnavailable: + # XXX: timeframe not supported for backend + dts_per_tf[timeframe] = ( + tsdb_history, + last_tsdb_dt, + None, + None, + None, + ) + continue + + # tsdb_history = series.get(timeframe) + dts_per_tf[timeframe] = ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) + + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + + # unblock the feed bus management task + # assert len(shms[1].array) + task_status.started() + + async def back_load_from_tsdb( + timeframe: int, + shm: ShmArray, + ): + ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) = dts_per_tf[timeframe] + + # sync to backend history task's query/load completion + if bf_done: + await bf_done.wait() + + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + array = shm.array + if len(array): + shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) + else: + shm_last_dt = None + + if last_tsdb_dt: + assert shm_last_dt >= last_tsdb_dt + + # do diff against start index of last frame of history and only + # fill in an amount of datums from tsdb allows for most recent + # to be loaded into mem *before* tsdb data. + if ( + last_tsdb_dt + and latest_start_dt + ): + backfilled_size_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s loaded' + ) + + # Load TSDB history into shm buffer (for display) if there is + # remaining buffer space. + + if ( + len(tsdb_history) + ): + # load the first (smaller) bit of history originally loaded + # above from ``Storage.load()``. + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=marketstore.ohlc_key_map, + ) + + tsdb_last_frame_start = tsdb_history['Epoch'][0] + + if timeframe == 1: + times = shm.array['time'] + assert (times[1] - times[0]) == 1 + + # load as much from storage into shm possible (depends on + # user's shm size settings). + while shm._first.value > 0: + + tsdb_history = await storage.read_ohlcv( + fqsn, + timeframe=timeframe, + end=tsdb_last_frame_start, + ) + + # empty query + if not len(tsdb_history): + break + + next_start = tsdb_history['Epoch'][0] + if next_start >= tsdb_last_frame_start: + # no earlier data detected + break + else: + tsdb_last_frame_start = next_start + + prepend_start = shm._first.value + to_push = tsdb_history[-prepend_start:] + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + shm.push( + to_push, + prepend=True, + field_map=marketstore.ohlc_key_map, + ) + log.info(f'Loaded {to_push.shape} datums from storage') + + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. + await sampler_stream.send('broadcast_all') + + # TODO: write new data to tsdb to be ready to for next read. + + # backload from db (concurrently per timeframe) once backfilling of + # recent dat a loaded from the backend provider (see + # ``bf_done.wait()`` call). + async with trio.open_nursery() as nurse: + for timeframe, shm in shms.items(): + nurse.start_soon( + back_load_from_tsdb, + timeframe, + shm, + ) + + +async def manage_history( + mod: ModuleType, + bus: _FeedsBus, + fqsn: str, + some_data_ready: trio.Event, + feed_is_live: trio.Event, + timeframe: float = 60, # in seconds + + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Load and manage historical data including the loading of any + available series from `marketstore` as well as conducting real-time + update of both that existing db and the allocated shared memory + buffer. + + ''' + + # TODO: is there a way to make each shm file key + # actor-tree-discovery-addr unique so we avoid collisions + # when doing tests which also allocate shms for certain instruments + # that may be in use on the system by some other running daemons? + # from tractor._state import _runtime_vars + # port = _runtime_vars['_root_mailbox'][1] + + uid = tractor.current_actor().uid + name, uuid = uid + service = name.rstrip(f'.{mod.name}') + + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + hist_shm, opened = maybe_open_shm_array( + # key=f'{fqsn}_hist_p{port}', + key=f'piker.{service}[{uuid[:16]}.{fqsn}.hist', + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) + hist_zero_index = hist_shm.index - 1 + + # TODO: history validation + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) + + rt_shm, opened = maybe_open_shm_array( + # key=f'{fqsn}_rt_p{port}', + # key=f'piker.{service}.{fqsn}_rt.{uuid}', + key=f'piker.{service}[{uuid[:16]}.{fqsn}.rt', + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + size=3*_secs_in_day, + ) + + # (for now) set the rt (hft) shm array with space to prepend + # only a few days worth of 1s history. + days = 2 + start_index = days*_secs_in_day + rt_shm._first.value = start_index + rt_shm._last.value = start_index + rt_zero_index = rt_shm.index - 1 + + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) + + # register 1s and 1m buffers with the global incrementer task + async with open_sample_stream( + period_s=1., + shms_by_period={ + 1.: rt_shm.token, + 60.: hist_shm.token, + }, + + # NOTE: we want to only open a stream for doing broadcasts on + # backfill operations, not receive the sample index-stream + # (since there's no code in this data feed layer that needs to + # consume it). + open_index_stream=True, + sub_for_broadcasts=False, + + ) as sample_stream: + + log.info('Scanning for existing `marketstored`') + tsdb_is_up = await check_for_service('marketstored') + + bfqsn = fqsn.replace('.' + mod.name, '') + open_history_client = getattr(mod, 'open_history_client', None) + assert open_history_client + + if ( + tsdb_is_up + and opened + and open_history_client + ): + log.info('Found existing `marketstored`') + + from ..service import marketstore + async with ( + marketstore.open_storage_client(fqsn)as storage, + ): + # TODO: drop returning the output that we pass in? + await bus.nursery.start( + tsdb_backfill, + mod, + marketstore, + bus, + storage, + fqsn, + bfqsn, + { + 1: rt_shm, + 60: hist_shm, + }, + sample_stream, + feed_is_live, + ) + + # yield back after client connect with filled shm + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # history retreival loop depending on user interaction + # and thus a small RPC-prot for remotely controllinlg + # what data is loaded for viewing. + await trio.sleep_forever() + + # load less history if no tsdb can be found + elif ( + not tsdb_is_up + and opened + ): + await basic_backfill( + bus, + mod, + bfqsn, + { + 1: rt_shm, + 60: hist_shm, + }, + sample_stream, + feed_is_live, + ) + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) + some_data_ready.set() + await trio.sleep_forever()