#!/usr/bin/env python from decimal import ( Decimal, ) from pathlib import Path import numpy as np # import polars as pl import trio import tractor from datetime import datetime # from pprint import pformat from piker.brokers.deribit.api import ( get_client, maybe_open_oi_feed, ) from piker.storage import open_storage_client, StorageClient from piker.log import get_logger import sys import pyqtgraph as pg from PyQt6 import QtCore from pyqtgraph import ScatterPlotItem, InfiniteLine from PyQt6.QtWidgets import QApplication from cryptofeed.symbols import Symbol log = get_logger(__name__) # XXX, use 2 newlines between top level LOC (even between these # imports and the next function line ;) def check_if_complete( oi: dict[str, dict[str, Decimal | None]] ) -> bool: return all( oi[strike]['C'] is not None and oi[strike]['P'] is not None for strike in oi ) async def max_pain_daemon( ) -> None: oi_by_strikes: dict[str, dict[str, Decimal | None]] instruments: list[Symbol] = [] expiry_dates: list[str] expiry_date: str currency: str = 'btc' kind: str = 'option' async with get_client( ) as client: expiry_dates: list[str] = await client.get_expiration_dates( currency=currency, kind=kind ) print(f'Available expiration dates for {currency}-{kind}:') print(f'{expiry_dates}') expiry_date = input('Please enter a valid expiration date: ').upper() print('Starting little daemon...') # maybe move this type annot down to the assignment line? oi_by_strikes: dict[str, dict[str, Decimal]] instruments = await client.get_instruments( expiry_date=expiry_date, ) oi_by_strikes = client.get_strikes_dict(instruments) def get_total_intrinsic_values( oi_by_strikes: dict[str, dict[str, Decimal]] ) -> dict[str, dict[str, Decimal]]: call_cash: Decimal = Decimal(0) put_cash: Decimal = Decimal(0) intrinsic_values: dict[str, dict[str, Decimal]] = {} closes: list = sorted(Decimal(close) for close in oi_by_strikes) for strike, oi in oi_by_strikes.items(): s = Decimal(strike) call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes) put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes) intrinsic_values[strike] = { 'C': call_cash, 'P': put_cash, 'total': call_cash + put_cash, } return intrinsic_values def get_intrinsic_value_and_max_pain( intrinsic_values: dict[str, dict[str, Decimal]] ): # We meed to find the lowest value, so we start at # infinity to ensure that, and the max_pain must be # an amount greater than zero. total_intrinsic_value: Decimal = Decimal('Infinity') max_pain: Decimal = Decimal(0) for strike, oi in oi_by_strikes.items(): s = Decimal(strike) if intrinsic_values[strike]['total'] < total_intrinsic_value: total_intrinsic_value = intrinsic_values[strike]['total'] max_pain = s return total_intrinsic_value, max_pain def plot_graph( oi_by_strikes: dict[str, dict[str, Decimal]], plot, ): """Update the bar graph with new open interest data.""" plot.clear() intrinsic_values = get_total_intrinsic_values(oi_by_strikes) for strike_str in sorted(oi_by_strikes, key=lambda x: int(x)): strike = int(strike_str) calls_val = float(oi_by_strikes[strike_str]['C']) puts_val = float(oi_by_strikes[strike_str]['P']) bar_c = pg.BarGraphItem( x=[strike - 100], height=[calls_val], width=200, pen='w', brush=(0, 0, 255, 150) ) plot.addItem(bar_c) bar_p = pg.BarGraphItem( x=[strike + 100], height=[puts_val], width=200, pen='w', brush=(255, 0, 0, 150) ) plot.addItem(bar_p) total_val = float(intrinsic_values[strike_str]['total']) / 100000 scatter_iv = ScatterPlotItem( x=[strike], y=[total_val], pen=pg.mkPen(color=(0, 255, 0), width=2), brush=pg.mkBrush(0, 255, 0, 150), size=3, symbol='o' ) plot.addItem(scatter_iv) _, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values) vertical_line = InfiniteLine( pos=max_pain, angle=90, pen=pg.mkPen(color='yellow', width=1, style=QtCore.Qt.PenStyle.DotLine), label=f'Max pain: {max_pain:,.0f}', labelOpts={ 'position': 0.85, 'color': 'yellow', 'movable': True } ) plot.addItem(vertical_line) def update_oi_by_strikes(msg: tuple): nonlocal oi_by_strikes if 'oi' == msg[0]: strike_price = msg[1]['strike_price'] option_type = msg[1]['option_type'] open_interest = msg[1]['open_interest'] oi_by_strikes.setdefault( strike_price, {} ).update( {option_type: open_interest} ) # Define the structured dtype dtype = np.dtype([ ('time', int), ('oi', float), ('oi_calc', float), ]) async def write_open_interest_on_file(msg: tuple, client: StorageClient): if 'oi' == msg[0]: nonlocal expiry_date timestamp = msg[1]['timestamp'] strike_price = msg[1]["strike_price"] option_type = msg[1]['option_type'].lower() col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}' # Create the numpy array with sample data data = np.array([ ( int(timestamp), float(msg[1]['open_interest']), np.nan, ), ], dtype=dtype) path: Path = await client.write_oi( col_sym_key, data, ) # TODO, use std logging like this throughout for status # emissions on console! log.info(f'Wrote OI history to {path}') def get_max_pain( oi_by_strikes: dict[str, dict[str, Decimal]] ) -> dict[str, str | Decimal]: ''' This method requires only the strike_prices and oi for call and puts, the closes list are the same as the strike_prices the idea is to sum all the calls and puts cash for each strike and the ITM strikes from that strike, the lowest value is what we are looking for the intrinsic value. ''' nonlocal timestamp intrinsic_values = get_total_intrinsic_values(oi_by_strikes) total_intrinsic_value, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values) return { 'timestamp': timestamp, 'expiry_date': expiry_date, 'total_intrinsic_value': total_intrinsic_value, 'max_pain': max_pain, } async with ( open_storage_client() as (_, storage), maybe_open_oi_feed( instruments, ) as oi_feed, ): # Initialize QApplication app = QApplication(sys.argv) win = pg.GraphicsLayoutWidget(show=True) win.setWindowTitle('Calls (blue) vs Puts (red)') plot = win.addPlot(title='OI by Strikes') plot.showGrid(x=True, y=True) print('Plot initialized...') async for msg in oi_feed: # In memory oi_by_strikes dict, all message are filtered here # and the dict is updated with the open interest data update_oi_by_strikes(msg) # Write on file using storage client await write_open_interest_on_file(msg, storage) # Max pain calcs, before start we must gather all the open interest for # all the strike prices and option types available for a expiration date if check_if_complete(oi_by_strikes): if 'oi' == msg[0]: # Here we must read for the filesystem all the latest open interest value for # each instrument for that specific expiration date, that means look up for the # last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is # hardcoded to something, sorry.) timestamp = msg[1]['timestamp'] max_pain = get_max_pain(oi_by_strikes) # intrinsic_values = get_total_intrinsic_values(oi_by_strikes) # graph here plot_graph(oi_by_strikes, plot) # TODO, use a single multiline string with `()` # and drop the multiple `print()` calls (this # should be done elsewhere in this file as well! # # As per the docs, # https://docs.python.org/3/reference/lexical_analysis.html#string-literal-concatenation # you could instead do, # print( # '-----------------------------------------------\n' # f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}\n' # ) # WHY? # |_ less ctx-switches/calls to `print()` # |_ the `str` can then be modified / passed # around as a variable more easily if needed in # the future ;) # # ALSO, i believe there already is a stdlib # module to do "alignment" of text which you # could try for doing the right-side alignment, # https://docs.python.org/3/library/textwrap.html#textwrap.indent # print('-----------------------------------------------') print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}') print(f'expiry_date: {max_pain['expiry_date']}') print(f'max_pain: {max_pain['max_pain']:,.0f}') print(f'total intrinsic value: {max_pain['total_intrinsic_value']:,.0f}') print('-----------------------------------------------') # Process GUI events to keep the window responsive app.processEvents() async def main(): async with tractor.open_nursery( debug_mode=True, loglevel='info', ) as an: from tractor import log log.get_console_log(level='info') ptl: tractor.Portal = await an.start_actor( 'max_pain_daemon', enable_modules=[__name__], infect_asyncio=True, # ^TODO, we can actually run this in the root-actor now # if needed as per 2nd "section" in, # https://pikers.dev/goodboy/tractor/pulls/2 # # NOTE, will first require us porting to modern # `tractor:main` though ofc! ) await ptl.run(max_pain_daemon) if __name__ == '__main__': trio.run(main)