Add write_oi for open interest
In storage.nativedb mod is manage the write_parquet file, this is a rudimentary way to write on file using parquet, meant just for development purpose. Add comments in max_pain to track the changes.max_pain_storage
parent
b9321dbb49
commit
8f1e082c91
|
@ -2,6 +2,8 @@
|
||||||
from decimal import (
|
from decimal import (
|
||||||
Decimal,
|
Decimal,
|
||||||
)
|
)
|
||||||
|
import numpy as np
|
||||||
|
import polars as pl
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
@ -10,6 +12,7 @@ from piker.brokers.deribit.api import (
|
||||||
get_client,
|
get_client,
|
||||||
maybe_open_oi_feed,
|
maybe_open_oi_feed,
|
||||||
)
|
)
|
||||||
|
from piker.storage import open_storage_client, StorageClient
|
||||||
import sys
|
import sys
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
from PyQt6 import QtCore
|
from PyQt6 import QtCore
|
||||||
|
@ -163,6 +166,34 @@ async def max_pain_daemon(
|
||||||
{option_type: open_interest}
|
{option_type: open_interest}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Define the structured dtype
|
||||||
|
dtype = np.dtype([
|
||||||
|
('time', int),
|
||||||
|
('oi', float),
|
||||||
|
('oi_calc', float),
|
||||||
|
])
|
||||||
|
async def write_open_interest_on_file(msg: tuple, client: StorageClient):
|
||||||
|
if 'oi' == msg[0]:
|
||||||
|
nonlocal expiry_date
|
||||||
|
timestamp = msg[1]['timestamp']
|
||||||
|
strike_price = msg[1]["strike_price"]
|
||||||
|
option_type = msg[1]['option_type'].lower()
|
||||||
|
col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}'
|
||||||
|
|
||||||
|
# Create the numpy array with sample data
|
||||||
|
data = np.array([
|
||||||
|
(
|
||||||
|
int(timestamp),
|
||||||
|
float(msg[1]['open_interest']),
|
||||||
|
np.nan,
|
||||||
|
),
|
||||||
|
], dtype=dtype)
|
||||||
|
|
||||||
|
path = await client.write_oi(
|
||||||
|
col_sym_key,
|
||||||
|
data,
|
||||||
|
)
|
||||||
|
|
||||||
def get_max_pain(
|
def get_max_pain(
|
||||||
oi_by_strikes: dict[str, dict[str, Decimal]]
|
oi_by_strikes: dict[str, dict[str, Decimal]]
|
||||||
) -> dict[str, str | Decimal]:
|
) -> dict[str, str | Decimal]:
|
||||||
|
@ -188,9 +219,13 @@ async def max_pain_daemon(
|
||||||
'max_pain': max_pain,
|
'max_pain': max_pain,
|
||||||
}
|
}
|
||||||
|
|
||||||
async with maybe_open_oi_feed(
|
async with (
|
||||||
|
open_storage_client() as (_, storage),
|
||||||
|
|
||||||
|
maybe_open_oi_feed(
|
||||||
instruments,
|
instruments,
|
||||||
) as oi_feed:
|
) as oi_feed,
|
||||||
|
):
|
||||||
# Initialize QApplication
|
# Initialize QApplication
|
||||||
app = QApplication(sys.argv)
|
app = QApplication(sys.argv)
|
||||||
|
|
||||||
|
@ -203,9 +238,21 @@ async def max_pain_daemon(
|
||||||
|
|
||||||
async for msg in oi_feed:
|
async for msg in oi_feed:
|
||||||
|
|
||||||
|
# In memory oi_by_strikes dict, all message are filtered here
|
||||||
|
# and the dict is updated with the open interest data
|
||||||
update_oi_by_strikes(msg)
|
update_oi_by_strikes(msg)
|
||||||
|
|
||||||
|
# Write on file using storage client
|
||||||
|
await write_open_interest_on_file(msg, storage)
|
||||||
|
|
||||||
|
# Max pain calcs, before start we must gather all the open interest for
|
||||||
|
# all the strike prices and option types available for a expiration date
|
||||||
if check_if_complete(oi_by_strikes):
|
if check_if_complete(oi_by_strikes):
|
||||||
if 'oi' == msg[0]:
|
if 'oi' == msg[0]:
|
||||||
|
# Here we must read for the filesystem all the latest open interest value for
|
||||||
|
# each instrument for that specific expiration date, that means look up for the
|
||||||
|
# last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is
|
||||||
|
# hardcoded to something, sorry.)
|
||||||
timestamp = msg[1]['timestamp']
|
timestamp = msg[1]['timestamp']
|
||||||
max_pain = get_max_pain(oi_by_strikes)
|
max_pain = get_max_pain(oi_by_strikes)
|
||||||
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
|
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
|
||||||
|
|
|
@ -138,6 +138,16 @@ class StorageClient(
|
||||||
) -> None:
|
) -> None:
|
||||||
...
|
...
|
||||||
|
|
||||||
|
async def write_oi(
|
||||||
|
self,
|
||||||
|
fqme: str,
|
||||||
|
oi: np.ndarray,
|
||||||
|
append_and_duplicate: bool = True,
|
||||||
|
limit: int = int(800e3),
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
class TimeseriesNotFound(Exception):
|
class TimeseriesNotFound(Exception):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -111,6 +111,24 @@ def mk_ohlcv_shm_keyed_filepath(
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def mk_oi_shm_keyed_filepath(
|
||||||
|
fqme: str,
|
||||||
|
period: float | int,
|
||||||
|
datadir: Path,
|
||||||
|
|
||||||
|
) -> Path:
|
||||||
|
|
||||||
|
if period < 1.:
|
||||||
|
raise ValueError('Sample period should be >= 1.!?')
|
||||||
|
|
||||||
|
path: Path = (
|
||||||
|
datadir
|
||||||
|
/
|
||||||
|
f'{fqme}.oi{int(period)}s.parquet'
|
||||||
|
)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
|
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
|
||||||
|
|
||||||
filename: str = str(path.name)
|
filename: str = str(path.name)
|
||||||
|
@ -172,7 +190,11 @@ class NativeStorageClient:
|
||||||
|
|
||||||
key: str = path.name.rstrip('.parquet')
|
key: str = path.name.rstrip('.parquet')
|
||||||
fqme, _, descr = key.rpartition('.')
|
fqme, _, descr = key.rpartition('.')
|
||||||
|
if 'ohlcv' in descr:
|
||||||
prefix, _, suffix = descr.partition('ohlcv')
|
prefix, _, suffix = descr.partition('ohlcv')
|
||||||
|
elif 'oi' in descr:
|
||||||
|
prefix, _, suffix = descr.partition('oi')
|
||||||
|
|
||||||
period: int = int(suffix.strip('s'))
|
period: int = int(suffix.strip('s'))
|
||||||
|
|
||||||
# cache description data
|
# cache description data
|
||||||
|
@ -369,6 +391,61 @@ class NativeStorageClient:
|
||||||
timeframe,
|
timeframe,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _write_oi(
|
||||||
|
self,
|
||||||
|
fqme: str,
|
||||||
|
oi: np.ndarray,
|
||||||
|
|
||||||
|
) -> Path:
|
||||||
|
'''
|
||||||
|
Sync version of the public interface meth, since we don't
|
||||||
|
currently actually need or support an async impl.
|
||||||
|
|
||||||
|
'''
|
||||||
|
path: Path = mk_oi_shm_keyed_filepath(
|
||||||
|
fqme=fqme,
|
||||||
|
period=1,
|
||||||
|
datadir=self._datadir,
|
||||||
|
)
|
||||||
|
if isinstance(oi, np.ndarray):
|
||||||
|
new_df: pl.DataFrame = tsp.np2pl(oi)
|
||||||
|
else:
|
||||||
|
new_df = oi
|
||||||
|
|
||||||
|
if path.exists():
|
||||||
|
old_df = pl.read_parquet(path)
|
||||||
|
df = pl.concat([old_df, new_df])
|
||||||
|
else:
|
||||||
|
df = new_df
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
df.write_parquet(path)
|
||||||
|
delay: float = round(
|
||||||
|
time.time() - start,
|
||||||
|
ndigits=6,
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
f'parquet write took {delay} secs\n'
|
||||||
|
f'file path: {path}'
|
||||||
|
)
|
||||||
|
return path
|
||||||
|
|
||||||
|
async def write_oi(
|
||||||
|
self,
|
||||||
|
fqme: str,
|
||||||
|
oi: np.ndarray,
|
||||||
|
|
||||||
|
) -> Path:
|
||||||
|
'''
|
||||||
|
Write input oi time series for fqme and sampling period
|
||||||
|
to (local) disk.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._write_oi(
|
||||||
|
fqme,
|
||||||
|
oi,
|
||||||
|
)
|
||||||
|
|
||||||
async def delete_ts(
|
async def delete_ts(
|
||||||
self,
|
self,
|
||||||
key: str,
|
key: str,
|
||||||
|
|
Loading…
Reference in New Issue