Compare commits

..

No commits in common. "5de14bc3f9a50b41bc3b07036eb292b79617b1dd" and "564cd63014883f09b96611ac3e552531fb39ef4a" have entirely different histories.

2 changed files with 28 additions and 197 deletions

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python
import trio
import tractor
from piker.brokers.deribit.api import (
maybe_open_oi_feed,
)
async def max_pain_daemon(
) -> None:
async with maybe_open_oi_feed() as oi_feed:
print('Im in...')
async def main():
async with tractor.open_nursery() as n:
p: tractor.Portal = await n.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
)
await p.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -112,10 +112,6 @@ def deribit_timestamp(when: datetime) -> int:
) )
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol: def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-') base, strike_price, expiry_date, option_type = name.split('-')
@ -128,9 +124,8 @@ def str_to_cb_sym(name: str) -> Symbol:
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
new_expiry_date: int = get_timestamp_int( new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
get_values_from_cb_normalized_date(expiry_date)
)
return Symbol( return Symbol(
base=base, base=base,
quote=quote, quote=quote,
@ -150,12 +145,11 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
)= tuple( )= tuple(
name.upper().split('-')) name.upper().split('-'))
new_expiry_date = get_timestamp_int(expiry_date)
quote: str = base quote: str = base
if option_type == 'P' or option_type == 'PUT': if option_type == 'P':
option_type = PUT option_type = PUT
elif option_type == 'C' or option_type == 'CALL': elif option_type == 'C':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
@ -166,7 +160,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date expiry_date=expiry_date
) )
@ -242,7 +236,6 @@ def get_config() -> dict[str, Any]:
section['log'] = {} section['log'] = {}
section['log']['filename'] = 'feedhandler.log' section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG' section['log']['level'] = 'DEBUG'
section['log']['disabled'] = True
return section return section
@ -320,20 +313,6 @@ class Client:
return balances return balances
async def get_currencies(
self,
) -> list[dict]:
'''
Return the set of currencies for deribit.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
return resp.result
async def get_assets( async def get_assets(
self, self,
venue: str | None = None, venue: str | None = None,
@ -346,7 +325,11 @@ class Client:
''' '''
assets = {} assets = {}
currencies = await self.get_currencies() resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies: list[dict] = resp.result
for currency in currencies: for currency in currencies:
name: str = currency['currency'] name: str = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision']) tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
@ -383,7 +366,6 @@ class Client:
currency: str = 'btc', currency: str = 'btc',
kind: str = 'option', kind: str = 'option',
expired: bool = False, expired: bool = False,
expiry_date: str = None,
) -> list[Symbol]: ) -> list[Symbol]:
""" """
@ -402,10 +384,8 @@ class Client:
resp = r.result resp = r.result
response_list = [] response_list = []
for i in range(len(resp)): for i in range(len(resp) // 10):
element = resp[i] element = resp[i]
name = f'{element["instrument_name"].split("-")[1]}'
if not expiry_date or name == expiry_date.upper():
response_list.append(piker_sym_to_cb_sym(element['instrument_name'])) response_list.append(piker_sym_to_cb_sym(element['instrument_name']))
return response_list return response_list
@ -792,10 +772,6 @@ async def maybe_open_price_feed(
async def aio_open_interest_feed_relay( async def aio_open_interest_feed_relay(
fh: FeedHandler, fh: FeedHandler,
instruments: list, instruments: list,
open_interests: dict[str, dict[str, dict[str, list[dict[str, Decimal]]]]],
losses_cache: dict[str, Decimal],
max_losses: Decimal,
max_pain: Decimal,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
@ -807,6 +783,12 @@ async def aio_open_interest_feed_relay(
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
''' '''
# Get timestamp and convert it to isoformat
date = (datetime.utcfromtimestamp(trade.timestamp)).isoformat()
print('Trade...')
print(date)
print(trade)
print('=======================')
to_trio.send_nowait(('trade', trade)) to_trio.send_nowait(('trade', trade))
# trade and oi are user defined functions that # trade and oi are user defined functions that
@ -820,88 +802,18 @@ async def aio_open_interest_feed_relay(
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side. Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
''' '''
nonlocal losses_cache # Get timestamp and convert it to isoformat
nonlocal max_losses date = (datetime.utcfromtimestamp(oi.timestamp)).isoformat()
nonlocal max_pain print('>>>> Open Interest...')
nonlocal open_interests print(date)
symbol: Symbol = str_to_cb_sym(oi.symbol) print(oi)
piker_sym: str = cb_sym_to_deribit_inst(symbol) print('==========================')
data: dict = oi.raw['params']['data'] to_trio.send_nowait(('oi', oi))
(
base,
expiry_date,
strike_price,
option_type
) = tuple(
piker_sym.split('-')
)
if not f'{expiry_date}' in open_interests:
open_interests[f'{expiry_date}'] = {
f'{strike_price}': {
'C': [],
'P': [],
'strike_losses': Decimal(0),
}
}
if not f'{strike_price}' in open_interests[f'{expiry_date}']:
open_interests[f'{expiry_date}'][f'{strike_price}'] = {
'C': [],
'P': [],
'strike_losses': Decimal(0),
}
index_price: Decimal = data['index_price']
price_delta: Decimal = abs(index_price - Decimal(strike_price))
open_interest: Decimal = oi.open_interest
losses: Decimal = price_delta * open_interest
if not f'{strike_price}' in losses_cache:
losses_cache[f'{strike_price}'] = {
'C': Decimal(0),
'P': Decimal(0),
}
losses_cache[f'{strike_price}'][f'{option_type}'] = losses
strike_losses: Decimal = (
losses_cache[f'{strike_price}']['C']
+
losses_cache[f'{strike_price}']['P']
)
print(f'>>>> Open Interest...')
print(f'max_losses: {max_losses}\n')
print(f'max_pain: {max_pain}')
print('-----------------------------------------------')
open_interests[f'{expiry_date}'][f'{strike_price}'][f'{option_type}'] = {
'date': oi.timestamp,
'open_interest': open_interest,
'index_price': index_price,
'strike_price': strike_price,
'price_delta': price_delta,
'losses': losses, # this updates the global value call_losses and put_losses
}
# calculate with latest values stored in call_losses and put_losses global cache.
open_interests[f'{expiry_date}'][f'{strike_price}']['strike_losses'] = strike_losses
for strike in open_interests[f'{expiry_date}']:
if open_interests[f'{expiry_date}'][strike]['strike_losses'] > max_losses:
max_losses = open_interests[f'{expiry_date}'][strike]['strike_losses']
max_pain = strike
print('-----------------------------------------------')
print(f'strike_price: {strike_price}')
print(f'strike_losses: {open_interests[f'{expiry_date}'][strike]['strike_losses']}')
print(f'{pformat(open_interests[f'{expiry_date}'][strike])}')
print('-----------------------------------------------')
channels = [TRADES, OPEN_INTEREST]
callbacks={TRADES: _trade, OPEN_INTEREST: _oi}
callbacks = {TRADES: _trade, OPEN_INTEREST: _oi}
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
channels=channels, channels=[TRADES, OPEN_INTEREST],
symbols=instruments, symbols=instruments,
callbacks=callbacks callbacks=callbacks
) )
@ -919,60 +831,6 @@ async def aio_open_interest_feed_relay(
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@acm
async def open_oi_feed(
) -> to_asyncio.LinkedTaskChannel:
expiry_date: str = '6DEC24' # '6DEC24' '26SEP25' '27JUN25' '13DEC24'
instruments: list[Symbol] = []
async with get_client(
) as client:
# to get all currencies available in deribit
# currencies = await client.get_currencies()
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
losses_cache: dict[str, Decimal] = { # {'<strike_price>': <value>}
'C': Decimal(0),
'P': Decimal(0),
}
max_losses: Decimal = Decimal(0)
max_pain: Decimal = Decimal(0)
open_interests: dict[str, dict[str, dict[str, list[dict]]]] = {}
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_open_interest_feed_relay,
fh,
instruments,
open_interests,
losses_cache,
max_losses,
max_pain,
)
) as (first, chan)
):
yield chan
@acm
async def maybe_open_oi_feed(
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_oi_feed,
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# TODO, move all to `.broker` submod! # TODO, move all to `.broker` submod!
# async def aio_order_feed_relay( # async def aio_order_feed_relay(