Open interest storage #42

Open
ntorres wants to merge 2 commits from max_pain_storage into max_pain_chart
8 changed files with 760 additions and 484 deletions

View File

@ -51,7 +51,6 @@ stdenv.mkDerivation {
xorg.xcbutilrenderutil
# Python requirements.
python312Full
python312Packages.uv
python312Packages.qdarkstyle
python312Packages.rapidfuzz

53
examples/max_pain.py 100644 → 100755
View File

@ -2,6 +2,8 @@
from decimal import (
Decimal,
)
import numpy as np
import polars as pl
import trio
import tractor
from datetime import datetime
@ -10,6 +12,7 @@ from piker.brokers.deribit.api import (
get_client,
maybe_open_oi_feed,
)
from piker.storage import open_storage_client, StorageClient
import sys
import pyqtgraph as pg
from PyQt6 import QtCore
@ -163,6 +166,34 @@ async def max_pain_daemon(
{option_type: open_interest}
)
# Define the structured dtype
dtype = np.dtype([
('time', int),
('oi', float),
('oi_calc', float),
])
async def write_open_interest_on_file(msg: tuple, client: StorageClient):
if 'oi' == msg[0]:
nonlocal expiry_date
timestamp = msg[1]['timestamp']
strike_price = msg[1]["strike_price"]
option_type = msg[1]['option_type'].lower()
col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}'
# Create the numpy array with sample data
data = np.array([
(
int(timestamp),
float(msg[1]['open_interest']),
np.nan,
),
], dtype=dtype)
path = await client.write_oi(
col_sym_key,
data,
)
def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, str | Decimal]:
@ -188,9 +219,13 @@ async def max_pain_daemon(
'max_pain': max_pain,
}
async with maybe_open_oi_feed(
instruments,
) as oi_feed:
async with (
open_storage_client() as (_, storage),
maybe_open_oi_feed(
instruments,
) as oi_feed,
):
# Initialize QApplication
app = QApplication(sys.argv)
@ -203,9 +238,21 @@ async def max_pain_daemon(
async for msg in oi_feed:
# In memory oi_by_strikes dict, all message are filtered here
# and the dict is updated with the open interest data
update_oi_by_strikes(msg)
# Write on file using storage client
await write_open_interest_on_file(msg, storage)
# Max pain calcs, before start we must gather all the open interest for
# all the strike prices and option types available for a expiration date
if check_if_complete(oi_by_strikes):
if 'oi' == msg[0]:
# Here we must read for the filesystem all the latest open interest value for
# each instrument for that specific expiration date, that means look up for the
# last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is
# hardcoded to something, sorry.)
timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes)
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)

View File

@ -92,7 +92,6 @@ class OptionPair(Pair, frozen=True):
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1

View File

@ -138,6 +138,16 @@ class StorageClient(
) -> None:
...
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
append_and_duplicate: bool = True,
limit: int = int(800e3),
) -> None:
...
class TimeseriesNotFound(Exception):
'''

View File

@ -111,6 +111,24 @@ def mk_ohlcv_shm_keyed_filepath(
return path
def mk_oi_shm_keyed_filepath(
fqme: str,
period: float | int,
datadir: Path,
) -> Path:
if period < 1.:
raise ValueError('Sample period should be >= 1.!?')
path: Path = (
datadir
/
f'{fqme}.oi{int(period)}s.parquet'
)
return path
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
filename: str = str(path.name)
@ -172,7 +190,11 @@ class NativeStorageClient:
key: str = path.name.rstrip('.parquet')
fqme, _, descr = key.rpartition('.')
prefix, _, suffix = descr.partition('ohlcv')
if 'ohlcv' in descr:
prefix, _, suffix = descr.partition('ohlcv')
elif 'oi' in descr:
prefix, _, suffix = descr.partition('oi')
period: int = int(suffix.strip('s'))
# cache description data
@ -369,6 +391,61 @@ class NativeStorageClient:
timeframe,
)
def _write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Sync version of the public interface meth, since we don't
currently actually need or support an async impl.
'''
path: Path = mk_oi_shm_keyed_filepath(
fqme=fqme,
period=1,
datadir=self._datadir,
)
if isinstance(oi, np.ndarray):
new_df: pl.DataFrame = tsp.np2pl(oi)
else:
new_df = oi
if path.exists():
old_df = pl.read_parquet(path)
df = pl.concat([old_df, new_df])
else:
df = new_df
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
return path
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Write input oi time series for fqme and sampling period
to (local) disk.
'''
return self._write_oi(
fqme,
oi,
)
async def delete_ts(
self,
key: str,

View File

@ -75,6 +75,7 @@ dependencies = [
"tractor",
"asyncvnc",
"tomlkit",
"pyqtgraph>=0.12.3",
]
[project.optional-dependencies]

139
tach.toml 100644
View File

@ -0,0 +1,139 @@
interfaces = []
exclude = [
"**/*__pycache__",
"**/*egg-info",
"**/docs",
"**/tests",
"**/venv",
]
source_roots = [
".",
]
[[modules ]]
path = "piker.fsp._engine"
depends_on = ["piker.log", "piker.types", "piker.data", "piker", "piker.data._sampling", "piker.data._sharedmem", "piker.data.feed", "piker.fsp._api"]
[[modules ]]
path = "piker"
depends_on = ["piker.data._sharedmem", "piker.cli", "piker.data", "piker.types", "piker.data.feed", "piker.data._pathops", "piker.fsp", "piker.data.validate", "piker.storage.marketstore", "piker.log", "piker.data._symcache", "piker.data.ticktools", "piker.data._formatters", "piker.data._web_bs", "piker.tsp", "piker.config", "piker._cacheables"]
[[modules ]]
path = "piker.fsp"
depends_on = ["piker.fsp._api", "piker.fsp._volume", "piker.fsp._engine"]
[[modules ]]
path = "piker._cacheables"
depends_on = ["piker.log"]
[[modules ]]
path = "piker.data._formatters"
depends_on = ["piker.data._sharedmem", "piker.data._pathops"]
[[modules ]]
path = "piker.types"
depends_on = []
[[modules ]]
path = "piker.data._sharedmem"
depends_on = ["piker.data._util", "piker.data._source", "piker.types"]
[[modules ]]
path = "piker.storage.cli"
depends_on = ["piker.storage", "piker", "piker.data._formatters", "piker.cli", "piker.data", "piker.tsp"]
[[modules ]]
path = "piker.data.feed"
depends_on = ["piker.data.validate", "piker.data.ingest", "piker.types", "piker.data._util", "piker", "piker.data.flows", "piker.data._sampling", "piker.tsp"]
[[modules ]]
path = "piker.fsp._api"
depends_on = ["piker.log", "piker.data._sharedmem", "piker.fsp._momo", "piker.fsp._volume"]
[[modules ]]
path = "piker.storage"
depends_on = ["piker.data.feed", "piker", "piker.config", "piker.log"]
[[modules ]]
path = "piker.data.ticktools"
depends_on = []
[[modules ]]
path = "piker.data._sampling"
depends_on = ["piker.data.ticktools", "piker.data._util", "piker.data._sharedmem", "piker"]
[[modules ]]
path = "piker.tsp._anal"
depends_on = ["piker.log", "piker"]
[[modules ]]
path = "piker.data.ingest"
depends_on = ["piker.data._util"]
[[modules ]]
path = "piker.config"
depends_on = ["piker.log"]
[[modules ]]
path = "piker.data.validate"
depends_on = ["piker.data._util", "piker", "piker.types"]
[[modules ]]
path = "piker.fsp._momo"
depends_on = ["piker.fsp._api", "piker.data", "piker.data._sharedmem"]
[[modules ]]
path = "piker.storage.marketstore"
depends_on = ["piker.log", "piker"]
[[modules ]]
path = "piker.cli"
depends_on = ["piker.storage.cli", "piker.log", "piker.config", "piker"]
[[modules ]]
path = "piker.data._symcache"
depends_on = ["piker.log", "piker.types", "piker.config", "piker"]
[[modules ]]
path = "piker.fsp._volume"
depends_on = ["piker.data", "piker.fsp._momo", "piker.fsp._api", "piker.log", "piker.data._sharedmem"]
[[modules ]]
path = "piker.data.flows"
depends_on = ["piker.types", "piker", "piker.data._sharedmem"]
[[modules ]]
path = "piker.data._pathops"
depends_on = ["piker.data._m4"]
[[modules ]]
path = "piker.data"
depends_on = ["piker.data.flows", "piker.data._sampling", "piker.data.feed", "piker.data.ticktools", "piker.types", "piker.data._sharedmem", "piker.data._symcache", "piker.data._source"]
[[modules ]]
path = "piker.storage.nativedb"
depends_on = ["piker.tsp", "piker.config", "piker.data", "piker.log", "piker.storage"]
[[modules ]]
path = "piker.data._m4"
depends_on = ["piker.data._util"]
[[modules ]]
path = "piker.tsp"
depends_on = ["piker.data._sampling", "piker.data._util", "piker.tsp._anal", "piker.storage", "piker.data._sharedmem", "piker", "piker.data._source"]
[[modules ]]
path = "piker.data._source"
depends_on = []
[[modules ]]
path = "piker.log"
depends_on = []
[[modules ]]
path = "piker.data._util"
depends_on = ["piker.log"]
[[modules ]]
path = "piker.data._web_bs"
depends_on = ["piker.types", "piker.data._util"]

960
uv.lock

File diff suppressed because it is too large Load Diff