Prototype a high level `Storage` api
Starts a wrapper around the `marketstore` client to do basic ohlcv query and retrieval and prototypes out write methods for ohlc and tick. Try to connect to `marketstore` automatically (which will fail if not started currently) but we will eventually first do a service query. Further: - get `pikerd` working with and without `--tsdb` flag. - support spawning `brokerd` with no real-time quotes. - bring back in "fqsn" support that was originally not in this history before commits factoring.l1_precision_fix
							parent
							
								
									cbe74d126e
								
							
						
					
					
						commit
						706c8085f2
					
				| 
						 | 
				
			
			@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
 | 
			
		|||
"""
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
from dataclasses import dataclass, field
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from functools import partial
 | 
			
		||||
from types import ModuleType
 | 
			
		||||
| 
						 | 
				
			
			@ -42,11 +43,13 @@ from .._cacheables import maybe_open_context
 | 
			
		|||
from ..log import get_logger, get_console_log
 | 
			
		||||
from .._daemon import (
 | 
			
		||||
    maybe_spawn_brokerd,
 | 
			
		||||
    check_for_service,
 | 
			
		||||
)
 | 
			
		||||
from ._sharedmem import (
 | 
			
		||||
    maybe_open_shm_array,
 | 
			
		||||
    attach_shm_array,
 | 
			
		||||
    ShmArray,
 | 
			
		||||
    _secs_in_day,
 | 
			
		||||
)
 | 
			
		||||
from .ingest import get_ingestormod
 | 
			
		||||
from ._source import (
 | 
			
		||||
| 
						 | 
				
			
			@ -125,7 +128,7 @@ class _FeedsBus(BaseModel):
 | 
			
		|||
 | 
			
		||||
    # def cancel_task(
 | 
			
		||||
    #     self,
 | 
			
		||||
    #     task: trio.lowlevel.Task
 | 
			
		||||
    #     task: trio.lowlevel.Task,
 | 
			
		||||
    # ) -> bool:
 | 
			
		||||
    #     ...
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -218,7 +221,61 @@ async def manage_history(
 | 
			
		|||
        readonly=False,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    if opened:
 | 
			
		||||
    log.info('Scanning for existing `marketstored`')
 | 
			
		||||
    is_up = await check_for_service('marketstored')
 | 
			
		||||
    if is_up and opened:
 | 
			
		||||
        log.info('Found existing `marketstored`')
 | 
			
		||||
        from . import marketstore
 | 
			
		||||
 | 
			
		||||
        async with marketstore.open_storage_client(
 | 
			
		||||
            fqsn,
 | 
			
		||||
        ) as (storage, tsdb_arrays):
 | 
			
		||||
 | 
			
		||||
            # TODO: history validation
 | 
			
		||||
            # assert opened, f'Persistent shm for {symbol} was already open?!'
 | 
			
		||||
            # if not opened:
 | 
			
		||||
            #     raise RuntimeError(
 | 
			
		||||
            #         "Persistent shm for sym was already open?!"
 | 
			
		||||
            #     )
 | 
			
		||||
 | 
			
		||||
            if tsdb_arrays:
 | 
			
		||||
                log.info(f'Loaded tsdb history {tsdb_arrays}')
 | 
			
		||||
                fastest = list(tsdb_arrays[fqsn].values())[0]
 | 
			
		||||
                last_s = fastest['Epoch'][-1]
 | 
			
		||||
 | 
			
		||||
                # TODO: see if there's faster multi-field reads:
 | 
			
		||||
                # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
 | 
			
		||||
 | 
			
		||||
                # re-index  with a `time` and index field
 | 
			
		||||
                shm.push(
 | 
			
		||||
                    fastest[-3 * _secs_in_day:],
 | 
			
		||||
 | 
			
		||||
                    # insert the history pre a "days worth" of samples
 | 
			
		||||
                    # to leave some real-time buffer space at the end.
 | 
			
		||||
                    prepend=True,
 | 
			
		||||
                    start=shm._len - _secs_in_day,
 | 
			
		||||
                    field_map={
 | 
			
		||||
                        'Epoch': 'time',
 | 
			
		||||
                        'Open': 'open',
 | 
			
		||||
                        'High': 'high',
 | 
			
		||||
                        'Low': 'low',
 | 
			
		||||
                        'Close': 'close',
 | 
			
		||||
                        'Volume': 'volume',
 | 
			
		||||
                    },
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # start history anal and load missing new data via backend.
 | 
			
		||||
                async with mod.open_history_client(fqsn) as hist:
 | 
			
		||||
 | 
			
		||||
                    # get latest query's worth of history
 | 
			
		||||
                    array, next_dt = await hist(end_dt='')
 | 
			
		||||
 | 
			
		||||
                    last_dt = datetime.fromtimestamp(last_s)
 | 
			
		||||
                    array, next_dt = await hist(end_dt=last_dt)
 | 
			
		||||
 | 
			
		||||
            some_data_ready.set()
 | 
			
		||||
 | 
			
		||||
    elif opened:
 | 
			
		||||
        log.info('No existing `marketstored` found..')
 | 
			
		||||
 | 
			
		||||
        # start history backfill task ``backfill_bars()`` is
 | 
			
		||||
| 
						 | 
				
			
			@ -254,6 +311,7 @@ async def manage_history(
 | 
			
		|||
            )
 | 
			
		||||
 | 
			
		||||
    await trio.sleep_forever()
 | 
			
		||||
    # cs.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def allocate_persistent_feed(
 | 
			
		||||
| 
						 | 
				
			
			@ -261,6 +319,7 @@ async def allocate_persistent_feed(
 | 
			
		|||
    brokername: str,
 | 
			
		||||
    symbol: str,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
    start_stream: bool = True,
 | 
			
		||||
 | 
			
		||||
    task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -302,10 +361,8 @@ async def allocate_persistent_feed(
 | 
			
		|||
            loglevel=loglevel,
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
    # the broker-specific fully qualified symbol name,
 | 
			
		||||
    # but ensure it is lower-cased for external use.
 | 
			
		||||
    bfqsn = init_msg[symbol]['fqsn'].lower()
 | 
			
		||||
    init_msg[symbol]['fqsn'] = bfqsn
 | 
			
		||||
    # the broker-specific fully qualified symbol name
 | 
			
		||||
    bfqsn = init_msg[symbol]['fqsn']
 | 
			
		||||
 | 
			
		||||
    # HISTORY, run 2 tasks:
 | 
			
		||||
    # - a history loader / maintainer
 | 
			
		||||
| 
						 | 
				
			
			@ -333,6 +390,7 @@ async def allocate_persistent_feed(
 | 
			
		|||
 | 
			
		||||
    # true fqsn
 | 
			
		||||
    fqsn = '.'.join((bfqsn, brokername))
 | 
			
		||||
 | 
			
		||||
    # add a fqsn entry that includes the ``.<broker>`` suffix
 | 
			
		||||
    init_msg[fqsn] = msg
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -364,6 +422,9 @@ async def allocate_persistent_feed(
 | 
			
		|||
    # task_status.started((init_msg,  generic_first_quotes))
 | 
			
		||||
    task_status.started()
 | 
			
		||||
 | 
			
		||||
    if not start_stream:
 | 
			
		||||
        await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
    # backend will indicate when real-time quotes have begun.
 | 
			
		||||
    await feed_is_live.wait()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -429,13 +490,12 @@ async def open_feed_bus(
 | 
			
		|||
 | 
			
		||||
                    bus=bus,
 | 
			
		||||
                    brokername=brokername,
 | 
			
		||||
 | 
			
		||||
                    # here we pass through the selected symbol in native
 | 
			
		||||
                    # "format" (i.e. upper vs. lowercase depending on
 | 
			
		||||
                    # provider).
 | 
			
		||||
                    symbol=symbol,
 | 
			
		||||
 | 
			
		||||
                    loglevel=loglevel,
 | 
			
		||||
                    start_stream=start_stream,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
            # TODO: we can remove this?
 | 
			
		||||
| 
						 | 
				
			
			@ -446,7 +506,7 @@ async def open_feed_bus(
 | 
			
		|||
    init_msg, first_quotes = bus.feeds[symbol]
 | 
			
		||||
 | 
			
		||||
    msg = init_msg[symbol]
 | 
			
		||||
    bfqsn = msg['fqsn'].lower()
 | 
			
		||||
    bfqsn = msg['fqsn']
 | 
			
		||||
 | 
			
		||||
    # true fqsn
 | 
			
		||||
    fqsn = '.'.join([bfqsn, brokername])
 | 
			
		||||
| 
						 | 
				
			
			@ -765,10 +825,7 @@ async def maybe_open_feed(
 | 
			
		|||
 | 
			
		||||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
) -> (
 | 
			
		||||
    Feed,
 | 
			
		||||
    ReceiveChannel[dict[str, Any]],
 | 
			
		||||
):
 | 
			
		||||
) -> (Feed, ReceiveChannel[dict[str, Any]]):
 | 
			
		||||
    '''
 | 
			
		||||
    Maybe open a data to a ``brokerd`` daemon only if there is no
 | 
			
		||||
    local one for the broker-symbol pair, if one is cached use it wrapped
 | 
			
		||||
| 
						 | 
				
			
			@ -789,7 +846,6 @@ async def maybe_open_feed(
 | 
			
		|||
            'start_stream': kwargs.get('start_stream', True),
 | 
			
		||||
        },
 | 
			
		||||
        key=fqsn,
 | 
			
		||||
 | 
			
		||||
    ) as (cache_hit, feed):
 | 
			
		||||
 | 
			
		||||
        if cache_hit:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,8 +28,9 @@ from pprint import pformat
 | 
			
		|||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    Optional,
 | 
			
		||||
    Union,
 | 
			
		||||
    # Callable,
 | 
			
		||||
    TYPE_CHECKING,
 | 
			
		||||
    # TYPE_CHECKING,
 | 
			
		||||
)
 | 
			
		||||
import time
 | 
			
		||||
from math import isnan
 | 
			
		||||
| 
						 | 
				
			
			@ -40,12 +41,19 @@ import numpy as np
 | 
			
		|||
import pandas as pd
 | 
			
		||||
import tractor
 | 
			
		||||
from trio_websocket import open_websocket_url
 | 
			
		||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params
 | 
			
		||||
from anyio_marketstore import (
 | 
			
		||||
    open_marketstore_client,
 | 
			
		||||
    MarketstoreClient,
 | 
			
		||||
    Params,
 | 
			
		||||
)
 | 
			
		||||
import purerpc
 | 
			
		||||
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from .feed import maybe_open_feed
 | 
			
		||||
from ._source import mk_fqsn, Symbol
 | 
			
		||||
 | 
			
		||||
from ._source import (
 | 
			
		||||
    mk_fqsn,
 | 
			
		||||
    # Symbol,
 | 
			
		||||
)
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
 | 
			
		||||
# if TYPE_CHECKING:
 | 
			
		||||
#     from ._sharedmem import ShmArray
 | 
			
		||||
| 
						 | 
				
			
			@ -210,42 +218,130 @@ tf_in_1s = bidict({
 | 
			
		|||
})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def manage_history(
 | 
			
		||||
    fqsn: str,
 | 
			
		||||
    period: int = 1,  # in seconds
 | 
			
		||||
 | 
			
		||||
) -> dict[str, np.ndarray]:
 | 
			
		||||
class Storage:
 | 
			
		||||
    '''
 | 
			
		||||
    Load a series by key and deliver in ``numpy`` struct array
 | 
			
		||||
    format.
 | 
			
		||||
    High level storage api for both real-time and historical ingest.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        client: MarketstoreClient,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        # TODO: eventually this should be an api/interface type that
 | 
			
		||||
        # ensures we can support multiple tsdb backends.
 | 
			
		||||
        self.client = client
 | 
			
		||||
 | 
			
		||||
        # series' cache from tsdb reads
 | 
			
		||||
        self._arrays: dict[str, np.ndarray] = {}
 | 
			
		||||
 | 
			
		||||
    async def write_ticks(self, ticks: list) -> None:
 | 
			
		||||
        ...
 | 
			
		||||
 | 
			
		||||
    async def write_ohlcv(self, ohlcv: np.ndarray) -> None:
 | 
			
		||||
        ...
 | 
			
		||||
 | 
			
		||||
    async def read_ohlcv(
 | 
			
		||||
        self,
 | 
			
		||||
        fqsn: str,
 | 
			
		||||
        timeframe: Optional[Union[int, str]] = None,
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[
 | 
			
		||||
        MarketstoreClient,
 | 
			
		||||
        Union[dict, np.ndarray]
 | 
			
		||||
    ]:
 | 
			
		||||
        client = self.client
 | 
			
		||||
        syms = await client.list_symbols()
 | 
			
		||||
 | 
			
		||||
        if fqsn not in syms:
 | 
			
		||||
            return {}
 | 
			
		||||
 | 
			
		||||
        if timeframe is None:
 | 
			
		||||
            log.info(f'starting {fqsn} tsdb granularity scan..')
 | 
			
		||||
            # loop through and try to find highest granularity
 | 
			
		||||
            for tfstr in tf_in_1s.values():
 | 
			
		||||
                try:
 | 
			
		||||
                    log.info(f'querying for {tfstr}@{fqsn}')
 | 
			
		||||
                    result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
 | 
			
		||||
                    break
 | 
			
		||||
                except purerpc.grpclib.exceptions.UnknownError:
 | 
			
		||||
                    # XXX: this is already logged by the container and
 | 
			
		||||
                    # thus shows up through `marketstored` logs relay.
 | 
			
		||||
                    # log.warning(f'{tfstr}@{fqsn} not found')
 | 
			
		||||
                    continue
 | 
			
		||||
            else:
 | 
			
		||||
                return {}
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            tfstr = tf_in_1s[timeframe]
 | 
			
		||||
            result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
 | 
			
		||||
 | 
			
		||||
        # Fill out a `numpy` array-results map
 | 
			
		||||
        arrays = {}
 | 
			
		||||
        for fqsn, data_set in result.by_symbols().items():
 | 
			
		||||
            arrays.setdefault(fqsn, {})[
 | 
			
		||||
                tf_in_1s.inverse[data_set.timeframe]
 | 
			
		||||
            ] = data_set.array
 | 
			
		||||
 | 
			
		||||
        return (
 | 
			
		||||
            client,
 | 
			
		||||
            arrays[fqsn][timeframe] if timeframe else arrays,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_storage_client(
 | 
			
		||||
    fqsn: str,
 | 
			
		||||
    period: Optional[Union[int, str]] = None,  # in seconds
 | 
			
		||||
 | 
			
		||||
) -> tuple[Storage, dict[str, np.ndarray]]:
 | 
			
		||||
    '''
 | 
			
		||||
    Load a series by key and deliver in ``numpy`` struct array format.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async with get_client() as client:
 | 
			
		||||
 | 
			
		||||
        tfstr = tf_in_1s[period]
 | 
			
		||||
        result = await client.query(
 | 
			
		||||
            Params(fqsn, tf_in_1s, 'OHLCV',)
 | 
			
		||||
        storage_client = Storage(client)
 | 
			
		||||
        arrays = await storage_client.read_ohlcv(
 | 
			
		||||
            fqsn,
 | 
			
		||||
            period,
 | 
			
		||||
        )
 | 
			
		||||
        # Dig out `numpy` results map
 | 
			
		||||
        arrays = {}
 | 
			
		||||
        # for qr in [onem, fivem]:
 | 
			
		||||
        for name, data_set in result.by_symbols().items():
 | 
			
		||||
            arrays[(name, qr)] = data_set.array
 | 
			
		||||
 | 
			
		||||
        await tractor.breakpoint()
 | 
			
		||||
        # # TODO: backfiller loop
 | 
			
		||||
        # array = arrays[(fqsn, qr)]
 | 
			
		||||
        return arrays
 | 
			
		||||
        yield storage_client, arrays
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def backfill_history_diff(
 | 
			
		||||
    # symbol: Symbol
 | 
			
		||||
 | 
			
		||||
) -> list[str]:
 | 
			
		||||
    # TODO: 
 | 
			
		||||
    # - compute time-smaple step
 | 
			
		||||
    # - take ``Symbol`` as input
 | 
			
		||||
    # - backtrack into history using backend helper endpoint
 | 
			
		||||
 | 
			
		||||
    # TODO: real-time dedicated task for ensuring
 | 
			
		||||
    # history consistency between the tsdb, shm and real-time feed..
 | 
			
		||||
 | 
			
		||||
    # update sequence design notes:
 | 
			
		||||
 | 
			
		||||
    # - load existing highest frequency data from mkts
 | 
			
		||||
    #   * how do we want to offer this to the UI?
 | 
			
		||||
    #    - lazy loading?
 | 
			
		||||
    #    - try to load it all and expect graphics caching/diffing
 | 
			
		||||
    #      to  hide extra bits that aren't in view?
 | 
			
		||||
 | 
			
		||||
    # - compute the diff between latest data from broker and shm
 | 
			
		||||
    #   * use sql api in mkts to determine where the backend should
 | 
			
		||||
    #     start querying for data?
 | 
			
		||||
    #   * append any diff with new shm length
 | 
			
		||||
    #   * determine missing (gapped) history by scanning
 | 
			
		||||
    #   * how far back do we look?
 | 
			
		||||
 | 
			
		||||
    # - begin rt update ingest and aggregation
 | 
			
		||||
    #   * could start by always writing ticks to mkts instead of
 | 
			
		||||
    #     worrying about a shm queue for now.
 | 
			
		||||
    #   * we have a short list of shm queues worth groking:
 | 
			
		||||
    #     - https://github.com/pikers/piker/issues/107
 | 
			
		||||
    #   * the original data feed arch blurb:
 | 
			
		||||
    #     - https://github.com/pikers/piker/issues/98
 | 
			
		||||
    #
 | 
			
		||||
 | 
			
		||||
    broker = 'ib'
 | 
			
		||||
    symbol = 'mnq.globex'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue