WIP get `pikerd` working with and without `--tsdb` flag

mkts_backup
Tyler Goodlet 2022-03-08 09:31:12 -05:00
parent 820dfff08a
commit 5775c5fe71
1 changed files with 102 additions and 70 deletions

View File

@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager
from functools import partial
from types import ModuleType
@ -42,11 +43,13 @@ from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
check_for_service,
)
from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
ShmArray,
_secs_in_day,
)
from .ingest import get_ingestormod
from ._source import (
@ -191,10 +194,8 @@ async def _setup_persistent_brokerd(
async def manage_history(
mod: ModuleType,
shm: ShmArray,
bus: _FeedsBus,
symbol: str,
we_opened_shm: bool,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
@ -208,68 +209,108 @@ async def manage_history(
buffer.
'''
# TODO: history retreival, see if we can pull from an existing
# ``marketstored`` daemon
opened = we_opened_shm
fqsn = mk_fqsn(mod.name, symbol)
from . import marketstore
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=fqsn,
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored')
if is_up and opened:
log.info('Found existing `marketstored`')
from . import marketstore
async with marketstore.open_storage_client(
fqsn,
) as (storage, arrays):
async with marketstore.open_storage_client(
fqsn,
) as (storage, tsdb_arrays):
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
# yield back after client connect
task_status.started()
if tsdb_arrays:
log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
last_s = fastest['Epoch'][-1]
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
if opened:
if arrays:
# re-index with a `time` and index field
shm.push(
fastest[-3 * _secs_in_day:],
log.info(f'Loaded tsdb history {arrays}')
# push to shm
# set data ready
# some_data_ready.set()
# ask broker backend for new history
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, symbol, shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
await trio.sleep_forever()
# cs.cancel()
# start history anal and load missing new data via backend.
async with mod.open_history_client(symbol) as hist:
# get latest query's worth of history
array, next_dt = await hist(end_dt='')
last_dt = datetime.fromtimestamp(last_s)
array, next_dt = await hist(end_dt=last_dt)
some_data_ready.set()
elif opened:
log.info('No existing `marketstored` found..')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, symbol, shm)
# yield back after client connect with filled shm
task_status.started(shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed(
@ -301,18 +342,6 @@ async def allocate_persistent_feed(
fqsn = mk_fqsn(brokername, symbol)
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=fqsn,
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
# mem chan handed to broker backend so it can push real-time
# quotes to this task for sampling and history storage (see below).
send, quote_stream = trio.open_memory_channel(10)
@ -329,15 +358,13 @@ async def allocate_persistent_feed(
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
# await bus.nursery.start(
await bus.start_task(
shm = await bus.nursery.start(
manage_history,
mod,
shm,
bus,
symbol,
opened,
some_data_ready,
feed_is_live,
)
@ -478,6 +505,11 @@ async def open_feed_bus(
async with (
ctx.open_stream() as stream,
):
# re-send to trigger display loop cycle (necessary especially
# when the mkt is closed and no real-time messages are
# expected).
await stream.send(first_quotes)
if tick_throttle:
# open a bg task which receives quotes over a mem chan