From 8f1e082c91197c6c463641c46146d72ffbf2f992 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Tue, 4 Mar 2025 19:30:11 -0300 Subject: [PATCH] Add write_oi for open interest In storage.nativedb mod is manage the write_parquet file, this is a rudimentary way to write on file using parquet, meant just for development purpose. Add comments in max_pain to track the changes. --- examples/max_pain.py | 53 ++++++++++++++++++++++++-- piker/storage/__init__.py | 10 +++++ piker/storage/nativedb.py | 79 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 138 insertions(+), 4 deletions(-) mode change 100644 => 100755 examples/max_pain.py diff --git a/examples/max_pain.py b/examples/max_pain.py old mode 100644 new mode 100755 index 29bbfb2f..456101f0 --- a/examples/max_pain.py +++ b/examples/max_pain.py @@ -2,6 +2,8 @@ from decimal import ( Decimal, ) +import numpy as np +import polars as pl import trio import tractor from datetime import datetime @@ -10,6 +12,7 @@ from piker.brokers.deribit.api import ( get_client, maybe_open_oi_feed, ) +from piker.storage import open_storage_client, StorageClient import sys import pyqtgraph as pg from PyQt6 import QtCore @@ -163,6 +166,34 @@ async def max_pain_daemon( {option_type: open_interest} ) + # Define the structured dtype + dtype = np.dtype([ + ('time', int), + ('oi', float), + ('oi_calc', float), + ]) + async def write_open_interest_on_file(msg: tuple, client: StorageClient): + if 'oi' == msg[0]: + nonlocal expiry_date + timestamp = msg[1]['timestamp'] + strike_price = msg[1]["strike_price"] + option_type = msg[1]['option_type'].lower() + col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}' + + # Create the numpy array with sample data + data = np.array([ + ( + int(timestamp), + float(msg[1]['open_interest']), + np.nan, + ), + ], dtype=dtype) + + path = await client.write_oi( + col_sym_key, + data, + ) + def get_max_pain( oi_by_strikes: dict[str, dict[str, Decimal]] ) -> dict[str, str | Decimal]: @@ -188,9 +219,13 @@ async def max_pain_daemon( 'max_pain': max_pain, } - async with maybe_open_oi_feed( - instruments, - ) as oi_feed: + async with ( + open_storage_client() as (_, storage), + + maybe_open_oi_feed( + instruments, + ) as oi_feed, + ): # Initialize QApplication app = QApplication(sys.argv) @@ -203,9 +238,21 @@ async def max_pain_daemon( async for msg in oi_feed: + # In memory oi_by_strikes dict, all message are filtered here + # and the dict is updated with the open interest data update_oi_by_strikes(msg) + + # Write on file using storage client + await write_open_interest_on_file(msg, storage) + + # Max pain calcs, before start we must gather all the open interest for + # all the strike prices and option types available for a expiration date if check_if_complete(oi_by_strikes): if 'oi' == msg[0]: + # Here we must read for the filesystem all the latest open interest value for + # each instrument for that specific expiration date, that means look up for the + # last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is + # hardcoded to something, sorry.) timestamp = msg[1]['timestamp'] max_pain = get_max_pain(oi_by_strikes) intrinsic_values = get_total_intrinsic_values(oi_by_strikes) diff --git a/piker/storage/__init__.py b/piker/storage/__init__.py index f32f40b6..c411cc4f 100644 --- a/piker/storage/__init__.py +++ b/piker/storage/__init__.py @@ -138,6 +138,16 @@ class StorageClient( ) -> None: ... + async def write_oi( + self, + fqme: str, + oi: np.ndarray, + append_and_duplicate: bool = True, + limit: int = int(800e3), + + ) -> None: + ... + class TimeseriesNotFound(Exception): ''' diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 8a948cab..b4ec6595 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -111,6 +111,24 @@ def mk_ohlcv_shm_keyed_filepath( return path +def mk_oi_shm_keyed_filepath( + fqme: str, + period: float | int, + datadir: Path, + +) -> Path: + + if period < 1.: + raise ValueError('Sample period should be >= 1.!?') + + path: Path = ( + datadir + / + f'{fqme}.oi{int(period)}s.parquet' + ) + return path + + def unpack_fqme_from_parquet_filepath(path: Path) -> str: filename: str = str(path.name) @@ -172,7 +190,11 @@ class NativeStorageClient: key: str = path.name.rstrip('.parquet') fqme, _, descr = key.rpartition('.') - prefix, _, suffix = descr.partition('ohlcv') + if 'ohlcv' in descr: + prefix, _, suffix = descr.partition('ohlcv') + elif 'oi' in descr: + prefix, _, suffix = descr.partition('oi') + period: int = int(suffix.strip('s')) # cache description data @@ -369,6 +391,61 @@ class NativeStorageClient: timeframe, ) + def _write_oi( + self, + fqme: str, + oi: np.ndarray, + + ) -> Path: + ''' + Sync version of the public interface meth, since we don't + currently actually need or support an async impl. + + ''' + path: Path = mk_oi_shm_keyed_filepath( + fqme=fqme, + period=1, + datadir=self._datadir, + ) + if isinstance(oi, np.ndarray): + new_df: pl.DataFrame = tsp.np2pl(oi) + else: + new_df = oi + + if path.exists(): + old_df = pl.read_parquet(path) + df = pl.concat([old_df, new_df]) + else: + df = new_df + + start = time.time() + df.write_parquet(path) + delay: float = round( + time.time() - start, + ndigits=6, + ) + log.info( + f'parquet write took {delay} secs\n' + f'file path: {path}' + ) + return path + + async def write_oi( + self, + fqme: str, + oi: np.ndarray, + + ) -> Path: + ''' + Write input oi time series for fqme and sampling period + to (local) disk. + + ''' + return self._write_oi( + fqme, + oi, + ) + async def delete_ts( self, key: str,