Compare commits

..

No commits in common. "5de14bc3f9a50b41bc3b07036eb292b79617b1dd" and "564cd63014883f09b96611ac3e552531fb39ef4a" have entirely different histories.

2 changed files with 28 additions and 197 deletions

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python
import trio
import tractor
from piker.brokers.deribit.api import (
maybe_open_oi_feed,
)
async def max_pain_daemon(
) -> None:
async with maybe_open_oi_feed() as oi_feed:
print('Im in...')
async def main():
async with tractor.open_nursery() as n:
p: tractor.Portal = await n.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
)
await p.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -112,10 +112,6 @@ def deribit_timestamp(when: datetime) -> int:
)
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
@ -128,9 +124,8 @@ def str_to_cb_sym(name: str) -> Symbol:
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date: int = get_timestamp_int(
get_values_from_cb_normalized_date(expiry_date)
)
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
return Symbol(
base=base,
quote=quote,
@ -150,12 +145,11 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
)= tuple(
name.upper().split('-'))
new_expiry_date = get_timestamp_int(expiry_date)
quote: str = base
if option_type == 'P' or option_type == 'PUT':
if option_type == 'P':
option_type = PUT
elif option_type == 'C' or option_type == 'CALL':
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
@ -166,7 +160,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=new_expiry_date
expiry_date=expiry_date
)
@ -242,7 +236,6 @@ def get_config() -> dict[str, Any]:
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
section['log']['disabled'] = True
return section
@ -320,20 +313,6 @@ class Client:
return balances
async def get_currencies(
self,
) -> list[dict]:
'''
Return the set of currencies for deribit.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
return resp.result
async def get_assets(
self,
venue: str | None = None,
@ -346,7 +325,11 @@ class Client:
'''
assets = {}
currencies = await self.get_currencies()
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies: list[dict] = resp.result
for currency in currencies:
name: str = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
@ -383,7 +366,6 @@ class Client:
currency: str = 'btc',
kind: str = 'option',
expired: bool = False,
expiry_date: str = None,
) -> list[Symbol]:
"""
@ -402,10 +384,8 @@ class Client:
resp = r.result
response_list = []
for i in range(len(resp)):
for i in range(len(resp) // 10):
element = resp[i]
name = f'{element["instrument_name"].split("-")[1]}'
if not expiry_date or name == expiry_date.upper():
response_list.append(piker_sym_to_cb_sym(element['instrument_name']))
return response_list
@ -792,10 +772,6 @@ async def maybe_open_price_feed(
async def aio_open_interest_feed_relay(
fh: FeedHandler,
instruments: list,
open_interests: dict[str, dict[str, dict[str, list[dict[str, Decimal]]]]],
losses_cache: dict[str, Decimal],
max_losses: Decimal,
max_pain: Decimal,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
@ -807,6 +783,12 @@ async def aio_open_interest_feed_relay(
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
# Get timestamp and convert it to isoformat
date = (datetime.utcfromtimestamp(trade.timestamp)).isoformat()
print('Trade...')
print(date)
print(trade)
print('=======================')
to_trio.send_nowait(('trade', trade))
# trade and oi are user defined functions that
@ -820,88 +802,18 @@ async def aio_open_interest_feed_relay(
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
'''
nonlocal losses_cache
nonlocal max_losses
nonlocal max_pain
nonlocal open_interests
symbol: Symbol = str_to_cb_sym(oi.symbol)
piker_sym: str = cb_sym_to_deribit_inst(symbol)
data: dict = oi.raw['params']['data']
(
base,
expiry_date,
strike_price,
option_type
) = tuple(
piker_sym.split('-')
)
if not f'{expiry_date}' in open_interests:
open_interests[f'{expiry_date}'] = {
f'{strike_price}': {
'C': [],
'P': [],
'strike_losses': Decimal(0),
}
}
if not f'{strike_price}' in open_interests[f'{expiry_date}']:
open_interests[f'{expiry_date}'][f'{strike_price}'] = {
'C': [],
'P': [],
'strike_losses': Decimal(0),
}
# Get timestamp and convert it to isoformat
date = (datetime.utcfromtimestamp(oi.timestamp)).isoformat()
print('>>>> Open Interest...')
print(date)
print(oi)
print('==========================')
to_trio.send_nowait(('oi', oi))
index_price: Decimal = data['index_price']
price_delta: Decimal = abs(index_price - Decimal(strike_price))
open_interest: Decimal = oi.open_interest
losses: Decimal = price_delta * open_interest
if not f'{strike_price}' in losses_cache:
losses_cache[f'{strike_price}'] = {
'C': Decimal(0),
'P': Decimal(0),
}
losses_cache[f'{strike_price}'][f'{option_type}'] = losses
strike_losses: Decimal = (
losses_cache[f'{strike_price}']['C']
+
losses_cache[f'{strike_price}']['P']
)
print(f'>>>> Open Interest...')
print(f'max_losses: {max_losses}\n')
print(f'max_pain: {max_pain}')
print('-----------------------------------------------')
open_interests[f'{expiry_date}'][f'{strike_price}'][f'{option_type}'] = {
'date': oi.timestamp,
'open_interest': open_interest,
'index_price': index_price,
'strike_price': strike_price,
'price_delta': price_delta,
'losses': losses, # this updates the global value call_losses and put_losses
}
# calculate with latest values stored in call_losses and put_losses global cache.
open_interests[f'{expiry_date}'][f'{strike_price}']['strike_losses'] = strike_losses
for strike in open_interests[f'{expiry_date}']:
if open_interests[f'{expiry_date}'][strike]['strike_losses'] > max_losses:
max_losses = open_interests[f'{expiry_date}'][strike]['strike_losses']
max_pain = strike
print('-----------------------------------------------')
print(f'strike_price: {strike_price}')
print(f'strike_losses: {open_interests[f'{expiry_date}'][strike]['strike_losses']}')
print(f'{pformat(open_interests[f'{expiry_date}'][strike])}')
print('-----------------------------------------------')
channels = [TRADES, OPEN_INTEREST]
callbacks = {TRADES: _trade, OPEN_INTEREST: _oi}
fh.add_feed(
DERIBIT,
channels=channels,
channels=[TRADES, OPEN_INTEREST],
symbols=instruments,
callbacks=callbacks
)
@ -919,60 +831,6 @@ async def aio_open_interest_feed_relay(
await asyncio.sleep(float('inf'))
@acm
async def open_oi_feed(
) -> to_asyncio.LinkedTaskChannel:
expiry_date: str = '6DEC24' # '6DEC24' '26SEP25' '27JUN25' '13DEC24'
instruments: list[Symbol] = []
async with get_client(
) as client:
# to get all currencies available in deribit
# currencies = await client.get_currencies()
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
losses_cache: dict[str, Decimal] = { # {'<strike_price>': <value>}
'C': Decimal(0),
'P': Decimal(0),
}
max_losses: Decimal = Decimal(0)
max_pain: Decimal = Decimal(0)
open_interests: dict[str, dict[str, dict[str, list[dict]]]] = {}
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_open_interest_feed_relay,
fh,
instruments,
open_interests,
losses_cache,
max_losses,
max_pain,
)
) as (first, chan)
):
yield chan
@acm
async def maybe_open_oi_feed(
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_oi_feed,
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(