diff --git a/piker/data/feed.py b/piker/data/feed.py index c732a8d1..6ff07b2b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations from dataclasses import dataclass, field +from datetime import datetime from contextlib import asynccontextmanager from functools import partial from types import ModuleType @@ -42,11 +43,13 @@ from .._cacheables import maybe_open_context from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, + check_for_service, ) from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, ShmArray, + _secs_in_day, ) from .ingest import get_ingestormod from ._source import ( @@ -191,10 +194,8 @@ async def _setup_persistent_brokerd( async def manage_history( mod: ModuleType, - shm: ShmArray, bus: _FeedsBus, symbol: str, - we_opened_shm: bool, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -208,68 +209,108 @@ async def manage_history( buffer. ''' - # TODO: history retreival, see if we can pull from an existing - # ``marketstored`` daemon - - opened = we_opened_shm fqsn = mk_fqsn(mod.name, symbol) - from . import marketstore + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + shm, opened = maybe_open_shm_array( + key=fqsn, + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) + log.info('Scanning for existing `marketstored`') + is_up = await check_for_service('marketstored') + if is_up and opened: + log.info('Found existing `marketstored`') + from . import marketstore - async with marketstore.open_storage_client( - fqsn, - ) as (storage, arrays): + async with marketstore.open_storage_client( + fqsn, + ) as (storage, tsdb_arrays): + # TODO: history validation + # assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") - # yield back after client connect - task_status.started() + if tsdb_arrays: + log.info(f'Loaded tsdb history {tsdb_arrays}') + fastest = list(tsdb_arrays[fqsn].values())[0] + last_s = fastest['Epoch'][-1] - # TODO: history validation - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - if opened: - if arrays: + # re-index with a `time` and index field + shm.push( + fastest[-3 * _secs_in_day:], - log.info(f'Loaded tsdb history {arrays}') - # push to shm - # set data ready - # some_data_ready.set() - - # ask broker backend for new history - - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # detect sample step size for sampled historical data - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - - # begin real-time updates of shm and tsb once the feed - # goes live. - await feed_is_live.wait() - - if opened: - sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) - - # start shm incrementing for OHLC sampling at the current - # detected sampling period if one dne. - if sampler.incrementers.get(delay_s) is None: - await bus.start_task( - increment_ohlc_buffer, - delay_s, + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + start=shm._len - _secs_in_day, + field_map={ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + }, ) - await trio.sleep_forever() - # cs.cancel() + # start history anal and load missing new data via backend. + async with mod.open_history_client(symbol) as hist: + + # get latest query's worth of history + array, next_dt = await hist(end_dt='') + + last_dt = datetime.fromtimestamp(last_s) + array, next_dt = await hist(end_dt=last_dt) + + some_data_ready.set() + + elif opened: + log.info('No existing `marketstored` found..') + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # yield back after client connect with filled shm + task_status.started(shm) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # detect sample step size for sampled historical data + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # begin real-time updates of shm and tsb once the feed + # goes live. + await feed_is_live.wait() + + if opened: + sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) + + # start shm incrementing for OHLC sampling at the current + # detected sampling period if one dne. + if sampler.incrementers.get(delay_s) is None: + await bus.start_task( + increment_ohlc_buffer, + delay_s, + ) + + await trio.sleep_forever() + # cs.cancel() async def allocate_persistent_feed( @@ -301,18 +342,6 @@ async def allocate_persistent_feed( fqsn = mk_fqsn(brokername, symbol) - # (maybe) allocate shm array for this broker/symbol which will - # be used for fast near-term history capture and processing. - shm, opened = maybe_open_shm_array( - key=fqsn, - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=False, - ) - # mem chan handed to broker backend so it can push real-time # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) @@ -329,15 +358,13 @@ async def allocate_persistent_feed( # XXX: neither of these will raise but will cause an inf hang due to: # https://github.com/python-trio/trio/issues/2258 # bus.nursery.start_soon( + # await bus.start_task( - # await bus.nursery.start( - await bus.start_task( + shm = await bus.nursery.start( manage_history, mod, - shm, bus, symbol, - opened, some_data_ready, feed_is_live, ) @@ -478,6 +505,11 @@ async def open_feed_bus( async with ( ctx.open_stream() as stream, ): + # re-send to trigger display loop cycle (necessary especially + # when the mkt is closed and no real-time messages are + # expected). + await stream.send(first_quotes) + if tick_throttle: # open a bg task which receives quotes over a mem chan