1028 lines
26 KiB
Python
1028 lines
26 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
'''
|
|
Deribit backend.
|
|
|
|
'''
|
|
import asyncio
|
|
from collections import ChainMap
|
|
from contextlib import (
|
|
asynccontextmanager as acm,
|
|
)
|
|
from datetime import datetime
|
|
from decimal import (
|
|
Decimal,
|
|
)
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from pprint import pformat
|
|
import time
|
|
from typing import (
|
|
Any,
|
|
Optional,
|
|
Callable,
|
|
)
|
|
|
|
from pendulum import now
|
|
import trio
|
|
import numpy as np
|
|
from tractor.trionics import (
|
|
broadcast_receiver,
|
|
maybe_open_context
|
|
)
|
|
from tractor import to_asyncio
|
|
# XXX WOOPS XD
|
|
# yeah you'll need to install it since it was removed in #489 by
|
|
# accident; well i thought we had removed all usage..
|
|
from cryptofeed import FeedHandler
|
|
from cryptofeed.defines import (
|
|
DERIBIT,
|
|
L1_BOOK, TRADES,
|
|
OPTION, CALL, PUT,
|
|
OPEN_INTEREST,
|
|
)
|
|
from cryptofeed.symbols import Symbol
|
|
from cryptofeed.types import (
|
|
L1Book,
|
|
Trade,
|
|
OpenInterest,
|
|
)
|
|
from piker.brokers import SymbolNotFound
|
|
from .venues import (
|
|
_ws_url,
|
|
MarketType,
|
|
PAIRTYPES,
|
|
Pair,
|
|
OptionPair,
|
|
JSONRPCResult,
|
|
KLinesResult,
|
|
LastTradesResult,
|
|
)
|
|
from piker.accounting import (
|
|
Asset,
|
|
digits_to_dec,
|
|
MktPair,
|
|
)
|
|
from piker.data import (
|
|
def_iohlcv_fields,
|
|
match_from_pairs,
|
|
# Struct,
|
|
)
|
|
from piker.data._web_bs import (
|
|
open_jsonrpc_session
|
|
)
|
|
|
|
|
|
from piker import config
|
|
from piker.log import get_logger
|
|
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
_spawn_kwargs = {
|
|
'infect_asyncio': True,
|
|
}
|
|
|
|
|
|
def deribit_timestamp(when: datetime) -> int:
|
|
'''
|
|
Convert conventional epoch timestamp, in secs, to unixtime in
|
|
milliseconds.
|
|
|
|
'''
|
|
return int(
|
|
(when.timestamp() * 1000)
|
|
+
|
|
(when.microsecond / 1000)
|
|
)
|
|
|
|
|
|
def get_timestamp_int(expiry_date: str) -> int:
|
|
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
|
|
|
|
|
|
def str_to_cb_sym(name: str) -> Symbol:
|
|
base, strike_price, expiry_date, option_type = name.split('-')
|
|
|
|
quote = base
|
|
|
|
if option_type == 'put':
|
|
option_type = PUT
|
|
elif option_type == 'call':
|
|
option_type = CALL
|
|
else:
|
|
raise Exception("Couldn\'t parse option type")
|
|
|
|
new_expiry_date: int = get_timestamp_int(
|
|
get_values_from_cb_normalized_date(expiry_date)
|
|
)
|
|
return Symbol(
|
|
base=base,
|
|
quote=quote,
|
|
type=OPTION,
|
|
strike_price=strike_price,
|
|
option_type=option_type,
|
|
expiry_date=new_expiry_date
|
|
)
|
|
|
|
|
|
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
|
(
|
|
base,
|
|
expiry_date,
|
|
strike_price,
|
|
option_type,
|
|
)= tuple(
|
|
name.upper().split('-'))
|
|
|
|
new_expiry_date = get_timestamp_int(expiry_date)
|
|
quote: str = base
|
|
|
|
if option_type == 'P' or option_type == 'PUT':
|
|
option_type = PUT
|
|
elif option_type == 'C' or option_type == 'CALL':
|
|
option_type = CALL
|
|
else:
|
|
raise Exception("Couldn\'t parse option type")
|
|
|
|
return Symbol(
|
|
base=base,
|
|
quote=quote,
|
|
type=OPTION,
|
|
strike_price=strike_price,
|
|
option_type=option_type,
|
|
expiry_date=new_expiry_date
|
|
)
|
|
|
|
|
|
# TODO, instead can't we just lookup the `MktPair` directly
|
|
# and pass it upward to `stream_quotes()`??
|
|
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
|
|
'''
|
|
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
|
|
uniquely from its fields.
|
|
|
|
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
|
|
for now i suppose..?
|
|
|
|
'''
|
|
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
|
|
otype = (
|
|
'C' if sym.option_type == CALL
|
|
else 'P'
|
|
)
|
|
return (
|
|
f'{sym.base}-'
|
|
f'{new_expiry_date}-'
|
|
f'{sym.strike_price}-'
|
|
f'{otype}'
|
|
)
|
|
|
|
|
|
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
|
|
# deribit specific
|
|
cb_norm = [
|
|
'F', 'G', 'H', 'J',
|
|
'K', 'M', 'N', 'Q',
|
|
'U', 'V', 'X', 'Z'
|
|
]
|
|
months = [
|
|
'JAN', 'FEB', 'MAR', 'APR',
|
|
'MAY', 'JUN', 'JUL', 'AUG',
|
|
'SEP', 'OCT', 'NOV', 'DEC'
|
|
]
|
|
# YYMDD
|
|
# 01234
|
|
day, month, year = (
|
|
expiry_date[3:],
|
|
months[cb_norm.index(expiry_date[2:3])],
|
|
expiry_date[:2]
|
|
)
|
|
return f'{day}{month}{year}'
|
|
|
|
|
|
def get_config() -> dict[str, Any]:
|
|
|
|
conf: dict
|
|
path: Path
|
|
conf, path = config.load(
|
|
conf_name='brokers',
|
|
touch_if_dne=True,
|
|
)
|
|
section: dict|None = conf.get('deribit')
|
|
if section is None:
|
|
raise ValueError(
|
|
f'No `[deribit]` section found in\n'
|
|
f'{path!r}\n\n'
|
|
f'See the template config from the core repo for samples..\n'
|
|
# f'<TODO put repo link here??>'
|
|
)
|
|
|
|
conf_option = section.get('option', {})
|
|
section = {} # clear the dict to reuse it
|
|
section['deribit'] = {}
|
|
section['deribit']['key_id'] = conf_option.get('api_key')
|
|
section['deribit']['key_secret'] = conf_option.get('api_secret')
|
|
|
|
section['log'] = {}
|
|
section['log']['filename'] = 'feedhandler.log'
|
|
section['log']['level'] = 'DEBUG'
|
|
section['log']['disabled'] = True
|
|
|
|
return section
|
|
|
|
|
|
class Client:
|
|
'''
|
|
Hi-level interface for the jsron-RPC over websocket API.
|
|
|
|
'''
|
|
def __init__(
|
|
self,
|
|
|
|
json_rpc: Callable
|
|
|
|
) -> None:
|
|
self._pairs: ChainMap[str, Pair] = ChainMap()
|
|
|
|
config = get_config().get('deribit', {})
|
|
|
|
self._key_id = config.get('key_id')
|
|
self._key_secret = config.get('key_secret')
|
|
|
|
self.json_rpc = json_rpc
|
|
|
|
self._auth_ts = None
|
|
self._auth_renew_ts = 5 # seconds to renew auth
|
|
|
|
async def _json_rpc_auth_wrapper(
|
|
self,
|
|
*args,
|
|
**kwargs,
|
|
) -> JSONRPCResult:
|
|
|
|
"""Background task that adquires a first access token and then will
|
|
refresh the access token.
|
|
|
|
https://docs.deribit.com/?python#authentication-2
|
|
"""
|
|
access_scope = 'trade:read_write'
|
|
current_ts = time.time()
|
|
|
|
if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts:
|
|
# if we are close to token expiry time
|
|
|
|
params = {
|
|
'grant_type': 'client_credentials',
|
|
'client_id': self._key_id,
|
|
'client_secret': self._key_secret,
|
|
'scope': access_scope
|
|
}
|
|
|
|
resp = await self.json_rpc('public/auth', params)
|
|
result = resp.result
|
|
|
|
self._auth_ts = time.time() + result['expires_in']
|
|
|
|
return await self.json_rpc(*args, **kwargs)
|
|
|
|
async def get_balances(
|
|
self,
|
|
kind: str = 'option'
|
|
) -> dict[str, float]:
|
|
"""Return the set of positions for this account
|
|
by symbol.
|
|
"""
|
|
balances = {}
|
|
|
|
for currency in self.currencies:
|
|
resp = await self._json_rpc_auth_wrapper(
|
|
'private/get_positions', params={
|
|
'currency': currency.upper(),
|
|
'kind': kind})
|
|
|
|
balances[currency] = resp.result
|
|
|
|
return balances
|
|
|
|
async def get_currencies(
|
|
self,
|
|
|
|
) -> list[dict]:
|
|
'''
|
|
Return the set of currencies for deribit.
|
|
'''
|
|
assets = {}
|
|
resp = await self._json_rpc_auth_wrapper(
|
|
'public/get_currencies',
|
|
params={}
|
|
)
|
|
return resp.result
|
|
|
|
async def get_assets(
|
|
self,
|
|
venue: str | None = None,
|
|
|
|
) -> dict[str, Asset]:
|
|
'''
|
|
Return the set of asset balances for this account
|
|
by (deribit's) symbol.
|
|
|
|
|
|
'''
|
|
assets = {}
|
|
currencies = await self.get_currencies()
|
|
for currency in currencies:
|
|
name: str = currency['currency']
|
|
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
|
|
|
|
# TODO, handling of options, futures, perps etc. more
|
|
# specifically with diff `.atype`s?
|
|
assets[name] = Asset(
|
|
name=name,
|
|
atype='crypto_currency',
|
|
tx_tick=tx_tick,
|
|
)
|
|
|
|
instruments = await self.symbol_info(currency=name)
|
|
for instrument in instruments:
|
|
pair = instruments[instrument]
|
|
assets[pair.symbol] = Asset(
|
|
name=pair.symbol,
|
|
atype=pair.venue,
|
|
tx_tick=pair.size_tick,
|
|
)
|
|
|
|
return assets
|
|
|
|
async def get_mkt_pairs(self) -> dict[str, Pair]:
|
|
flat: dict[str, Pair] = {}
|
|
for key in self._pairs:
|
|
item = self._pairs.get(key)
|
|
flat[item.bs_fqme] = item
|
|
|
|
return flat
|
|
|
|
async def get_instruments(
|
|
self,
|
|
currency: str = 'btc',
|
|
kind: str = 'option',
|
|
expired: bool = False,
|
|
expiry_date: str = None,
|
|
|
|
) -> list[Symbol]:
|
|
"""
|
|
Get instruments for cryptoFeed.FeedHandler.
|
|
"""
|
|
params: dict[str, str] = {
|
|
'currency': currency.upper(),
|
|
'kind': kind,
|
|
'expired': expired,
|
|
}
|
|
|
|
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
|
|
'public/get_instruments',
|
|
params,
|
|
)
|
|
resp = r.result
|
|
response_list = []
|
|
|
|
for i in range(len(resp)):
|
|
element = resp[i]
|
|
name = f'{element["instrument_name"].split("-")[1]}'
|
|
if not expiry_date or name == expiry_date.upper():
|
|
response_list.append(piker_sym_to_cb_sym(element['instrument_name']))
|
|
|
|
return response_list
|
|
|
|
def get_strikes_dict(
|
|
self,
|
|
instruments: list[Symbol],
|
|
|
|
) -> dict[str, dict[str, Decimal | None]]:
|
|
"""
|
|
Get a dict with strike prices as keys.
|
|
"""
|
|
|
|
response: dict[str, dict[str, Decimal | None]] = {}
|
|
|
|
for i in range(len(instruments)):
|
|
element = instruments[i]
|
|
strike = f'{str(element).split('-')[1]}'
|
|
response[f'{strike}'] = {
|
|
'C': None,
|
|
'P': None,
|
|
}
|
|
|
|
return response
|
|
|
|
async def submit_limit(
|
|
self,
|
|
symbol: str,
|
|
price: float,
|
|
action: str,
|
|
size: float
|
|
) -> dict:
|
|
"""Place an order
|
|
"""
|
|
params = {
|
|
'instrument_name': symbol.upper(),
|
|
'amount': size,
|
|
'type': 'limit',
|
|
'price': price,
|
|
}
|
|
resp = await self._json_rpc_auth_wrapper(
|
|
f'private/{action}', params)
|
|
|
|
return resp.result
|
|
|
|
async def submit_cancel(self, oid: str):
|
|
"""Send cancel request for order id
|
|
"""
|
|
resp = await self._json_rpc_auth_wrapper(
|
|
'private/cancel', {'order_id': oid})
|
|
return resp.result
|
|
|
|
async def exch_info(
|
|
self,
|
|
sym: str | None = None,
|
|
|
|
venue: MarketType = 'option',
|
|
expiry: str | None = None,
|
|
|
|
) -> dict[str, Pair] | Pair:
|
|
|
|
pair_table: dict[str, Pair] = self._pairs
|
|
|
|
if (
|
|
sym
|
|
and (cached_pair := pair_table.get(sym))
|
|
):
|
|
return cached_pair
|
|
|
|
if sym:
|
|
opt: OptionPair|None = pair_table.get(sym)
|
|
if not opt:
|
|
closest_matches: dict[str, Pair] = match_from_pairs(
|
|
pairs=pair_table,
|
|
query=sym,
|
|
score_cutoff=40,
|
|
)
|
|
closest_syms: list[str] = list(closest_matches.keys())
|
|
raise ValueError(
|
|
f'No contract found for {sym!r}\n\n'
|
|
f'Closest {len(closest_syms)} available contracts:\n\n'
|
|
f'{pformat(closest_syms)}\n'
|
|
)
|
|
return pair_table[sym]
|
|
else:
|
|
return self._pairs
|
|
|
|
async def symbol_info(
|
|
self,
|
|
instrument: Optional[str] = None,
|
|
currency: str = 'btc', # BTC, ETH, SOL, USDC
|
|
kind: str = 'option',
|
|
expired: bool = False
|
|
|
|
) -> dict[str, Pair] | Pair:
|
|
'''
|
|
Get symbol infos.
|
|
|
|
'''
|
|
if self._pairs:
|
|
return self._pairs
|
|
|
|
# will retrieve all symbols by default
|
|
params: dict[str, str] = {
|
|
'currency': currency.upper(),
|
|
'kind': kind,
|
|
'expired': expired,
|
|
}
|
|
|
|
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
|
|
'public/get_instruments',
|
|
params,
|
|
)
|
|
# convert to symbol-keyed table
|
|
pair_type: Pair = PAIRTYPES[kind]
|
|
results: list[dict] | None = resp.result
|
|
|
|
instruments: dict[str, Pair] = {}
|
|
for item in results:
|
|
symbol=item['instrument_name'].lower()
|
|
try:
|
|
pair: Pair = pair_type(
|
|
symbol=symbol,
|
|
**item
|
|
)
|
|
except Exception as e:
|
|
e.add_note(
|
|
"\nDon't panic, prolly stupid deribit changed their symbology schema again..\n"
|
|
'Check out their API docs here:\n\n'
|
|
'https://docs.deribit.com/?python#deribit-api-v2-1-1'
|
|
)
|
|
raise
|
|
|
|
instruments[symbol] = pair
|
|
|
|
if instrument is not None:
|
|
return instruments[instrument.lower()]
|
|
else:
|
|
return instruments
|
|
|
|
async def cache_symbols(
|
|
self,
|
|
venue: MarketType = 'option',
|
|
|
|
) -> None:
|
|
# lookup internal mkt-specific pair table to update
|
|
pair_table: dict[str, Pair] = self._pairs
|
|
|
|
# make API request(s)
|
|
mkt_pairs = await self.symbol_info()
|
|
|
|
if not mkt_pairs:
|
|
raise SymbolNotFound(
|
|
f'No market pairs found!?:\n'
|
|
f'{mkt_pairs}'
|
|
)
|
|
|
|
pairs_view_subtable: dict[str, Pair] = {}
|
|
|
|
for instrument in mkt_pairs:
|
|
pair_type: Pair|OptionPair = PAIRTYPES[venue]
|
|
|
|
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
|
|
|
|
pair_table[pair.symbol.upper()] = pair
|
|
|
|
# update an additional top-level-cross-venue-table
|
|
# `._pairs: ChainMap` for search B0
|
|
pairs_view_subtable[pair.bs_fqme] = pair
|
|
|
|
self._pairs.maps.append(pairs_view_subtable)
|
|
|
|
return self._pairs
|
|
|
|
async def search_symbols(
|
|
self,
|
|
pattern: str,
|
|
limit: int = 30,
|
|
) -> dict[str, Pair]:
|
|
'''
|
|
Fuzzy search symbology set for pairs matching `pattern`.
|
|
|
|
'''
|
|
pairs: dict[str, Pair] = await self.exch_info()
|
|
|
|
return match_from_pairs(
|
|
pairs=pairs,
|
|
query=pattern.upper(),
|
|
score_cutoff=35,
|
|
limit=limit
|
|
)
|
|
|
|
async def bars(
|
|
self,
|
|
mkt: MktPair,
|
|
|
|
start_dt: Optional[datetime] = None,
|
|
end_dt: Optional[datetime] = None,
|
|
|
|
limit: int = 1000,
|
|
as_np: bool = True,
|
|
|
|
) -> list[tuple] | np.ndarray:
|
|
instrument: str = mkt.bs_fqme.split('.')[0]
|
|
|
|
if end_dt is None:
|
|
end_dt = now('UTC')
|
|
|
|
_orig_start_dt = start_dt
|
|
if start_dt is None:
|
|
start_dt = end_dt.start_of(
|
|
'minute'
|
|
).subtract(minutes=limit)
|
|
|
|
start_time: int = deribit_timestamp(start_dt)
|
|
end_time: int = deribit_timestamp(end_dt)
|
|
|
|
# https://docs.deribit.com/#public-get_tradingview_chart_data
|
|
resp = await self._json_rpc_auth_wrapper(
|
|
'public/get_tradingview_chart_data',
|
|
params={
|
|
'instrument_name': instrument.upper(),
|
|
'start_timestamp': start_time,
|
|
'end_timestamp': end_time,
|
|
'resolution': '1'
|
|
})
|
|
|
|
result = KLinesResult(**resp.result)
|
|
new_bars: list[tuple] = []
|
|
# if _orig_start_dt is None:
|
|
# if not new_bars:
|
|
# import tractor
|
|
# await tractor.pause()
|
|
|
|
for i in range(len(result.close)):
|
|
row = [
|
|
(start_time + (i * (60 * 1000))) / 1000.0, # time
|
|
result.open[i],
|
|
result.high[i],
|
|
result.low[i],
|
|
result.close[i],
|
|
result.volume[i]
|
|
]
|
|
|
|
new_bars.append((i,) + tuple(row))
|
|
|
|
if not as_np:
|
|
return result
|
|
|
|
return np.array(
|
|
new_bars,
|
|
dtype=def_iohlcv_fields
|
|
)
|
|
|
|
async def last_trades(
|
|
self,
|
|
instrument: str,
|
|
count: int = 10
|
|
):
|
|
resp = await self._json_rpc_auth_wrapper(
|
|
'public/get_last_trades_by_instrument',
|
|
params={
|
|
'instrument_name': instrument,
|
|
'count': count
|
|
})
|
|
|
|
return LastTradesResult(**resp.result)
|
|
|
|
|
|
@acm
|
|
async def get_client(
|
|
is_brokercheck: bool = False,
|
|
venue: MarketType = 'option',
|
|
) -> Client:
|
|
|
|
async with (
|
|
trio.open_nursery() as n,
|
|
open_jsonrpc_session(
|
|
_ws_url, response_type=JSONRPCResult
|
|
) as json_rpc
|
|
):
|
|
client = Client(json_rpc)
|
|
await client.cache_symbols()
|
|
yield client
|
|
n.cancel_scope.cancel()
|
|
|
|
|
|
@acm
|
|
async def open_feed_handler() -> FeedHandler:
|
|
fh = FeedHandler(config=get_config())
|
|
yield fh
|
|
await to_asyncio.run_task(fh.stop_async)
|
|
|
|
|
|
@acm
|
|
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
|
async with maybe_open_context(
|
|
acm_func=open_feed_handler,
|
|
key='feedhandler',
|
|
) as (cache_hit, fh):
|
|
yield fh
|
|
|
|
|
|
async def aio_price_feed_relay(
|
|
fh: FeedHandler,
|
|
instrument: str,
|
|
from_trio: asyncio.Queue,
|
|
to_trio: trio.abc.SendChannel,
|
|
) -> None:
|
|
'''
|
|
Relay price feed quotes from the `cryptofeed.FeedHandler` to
|
|
the `piker`-side `trio.task` consumers for delivery to consumer
|
|
sub-actors for various subsystems.
|
|
|
|
'''
|
|
async def _trade(
|
|
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
|
receipt_timestamp: int,
|
|
) -> None:
|
|
'''
|
|
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
|
|
|
|
'''
|
|
to_trio.send_nowait(('trade', trade))
|
|
|
|
async def _l1(
|
|
book: L1Book,
|
|
receipt_timestamp: int,
|
|
) -> None:
|
|
'''
|
|
Relay-thru "l1 book" updates.
|
|
|
|
'''
|
|
|
|
to_trio.send_nowait(('l1', book))
|
|
|
|
# TODO, make this work!
|
|
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
|
|
# breakpoint()
|
|
|
|
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
|
fh.add_feed(
|
|
DERIBIT,
|
|
channels=[TRADES, L1_BOOK],
|
|
symbols=[sym],
|
|
callbacks={
|
|
TRADES: _trade,
|
|
L1_BOOK: _l1
|
|
})
|
|
|
|
if not fh.running:
|
|
fh.run(
|
|
start_loop=False,
|
|
install_signal_handlers=False
|
|
)
|
|
|
|
# sync with trio
|
|
to_trio.send_nowait(None)
|
|
|
|
# run until cancelled
|
|
await asyncio.sleep(float('inf'))
|
|
|
|
|
|
@acm
|
|
async def open_price_feed(
|
|
instrument: str
|
|
) -> to_asyncio.LinkedTaskChannel:
|
|
|
|
fh: FeedHandler
|
|
first: None
|
|
chan: to_asyncio.LinkedTaskChannel
|
|
async with (
|
|
maybe_open_feed_handler() as fh,
|
|
to_asyncio.open_channel_from(
|
|
partial(
|
|
aio_price_feed_relay,
|
|
fh,
|
|
instrument
|
|
)
|
|
) as (first, chan)
|
|
):
|
|
yield chan
|
|
|
|
|
|
@acm
|
|
async def maybe_open_price_feed(
|
|
instrument: str
|
|
) -> trio.abc.ReceiveStream:
|
|
|
|
# TODO: add a predicate to maybe_open_context
|
|
feed: to_asyncio.LinkedTaskChannel
|
|
async with maybe_open_context(
|
|
acm_func=open_price_feed,
|
|
kwargs={
|
|
'instrument': instrument.split('.')[0]
|
|
},
|
|
key=f'{instrument.split('.')[0]}-price',
|
|
) as (cache_hit, feed):
|
|
if cache_hit:
|
|
yield broadcast_receiver(feed, 10)
|
|
else:
|
|
yield feed
|
|
|
|
|
|
async def aio_open_interest_feed_relay(
|
|
fh: FeedHandler,
|
|
instruments: list,
|
|
oi_by_strikes: dict[str, dict[str, Decimal]],
|
|
from_trio: asyncio.Queue,
|
|
to_trio: trio.abc.SendChannel,
|
|
) -> None:
|
|
|
|
intrinsic_values: dict[str, dict[str, Decimal]] = {}
|
|
total_intrinsic_value: Decimal = Decimal('Infinity')
|
|
max_pain: Decimal = Decimal(0)
|
|
|
|
async def _trade(
|
|
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
|
receipt_timestamp: int,
|
|
) -> None:
|
|
'''
|
|
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
|
|
|
|
'''
|
|
to_trio.send_nowait(('trade', trade))
|
|
|
|
# trade and oi are user defined functions that
|
|
# will be called when trade and open interest updates are received
|
|
# data type is not dict, is an object: cryptofeed.types.OpenINterest
|
|
async def _oi(
|
|
oi: OpenInterest,
|
|
receipt_timestamp: int,
|
|
) -> None:
|
|
'''
|
|
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
|
|
|
|
'''
|
|
nonlocal intrinsic_values
|
|
nonlocal oi_by_strikes
|
|
nonlocal total_intrinsic_value
|
|
nonlocal max_pain
|
|
|
|
symbol: Symbol = str_to_cb_sym(oi.symbol)
|
|
piker_sym: str = cb_sym_to_deribit_inst(symbol)
|
|
(
|
|
base,
|
|
expiry_date,
|
|
strike_price,
|
|
option_type
|
|
) = tuple(
|
|
piker_sym.split('-')
|
|
)
|
|
open_interest: Decimal = oi.open_interest
|
|
oi_by_strikes[f'{strike_price}'][f'{option_type}'] = open_interest
|
|
|
|
if check_if_complete(oi_by_strikes):
|
|
closes: list[str] = sorted(oi_by_strikes.keys())
|
|
for strike in oi_by_strikes:
|
|
s: Decimal = Decimal(f'{strike}')
|
|
call_cash: Decimal = Decimal(0)
|
|
put_cash: Decimal = Decimal(0)
|
|
for close in closes:
|
|
c: Decimal = Decimal(f'{close}')
|
|
call_cash += max(0, (s - c) * oi_by_strikes[f'{close}']['C'])
|
|
put_cash += max(0, (c - s) * oi_by_strikes[f'{close}']['P'])
|
|
|
|
intrinsic_values[strike] = {
|
|
'C': call_cash,
|
|
'P': put_cash,
|
|
'total': call_cash + put_cash,
|
|
}
|
|
|
|
for strike in intrinsic_values:
|
|
if intrinsic_values[f'{strike}']['total'] < total_intrinsic_value:
|
|
total_intrinsic_value = intrinsic_values[f'{strike}']['total']
|
|
max_pain = strike
|
|
|
|
print('-----------------------------------------------')
|
|
print(f'expiry date: {expiry_date}')
|
|
print(f'max_pain: {max_pain}')
|
|
print(f'total intrinsic value: {total_intrinsic_value}')
|
|
print('-----------------------------------------------')
|
|
|
|
|
|
channels = [TRADES, OPEN_INTEREST]
|
|
callbacks={TRADES: _trade, OPEN_INTEREST: _oi}
|
|
|
|
fh.add_feed(
|
|
DERIBIT,
|
|
channels=channels,
|
|
symbols=instruments,
|
|
callbacks=callbacks
|
|
)
|
|
|
|
if not fh.running:
|
|
fh.run(
|
|
start_loop=False,
|
|
install_signal_handlers=False
|
|
)
|
|
|
|
# sync with trio
|
|
to_trio.send_nowait(None)
|
|
|
|
# run until cancelled
|
|
await asyncio.sleep(float('inf'))
|
|
|
|
|
|
@acm
|
|
async def open_oi_feed(
|
|
) -> to_asyncio.LinkedTaskChannel:
|
|
expiry_date: str = '20DEC24'
|
|
instruments: list[Symbol] = []
|
|
oi_by_strikes: dict[str, dict[str, Decimal]]
|
|
|
|
async with get_client(
|
|
) as client:
|
|
instruments = await client.get_instruments(
|
|
expiry_date=expiry_date,
|
|
)
|
|
oi_by_strikes = client.get_strikes_dict(instruments)
|
|
|
|
fh: FeedHandler
|
|
first: None
|
|
chan: to_asyncio.LinkedTaskChannel
|
|
async with (
|
|
maybe_open_feed_handler() as fh,
|
|
to_asyncio.open_channel_from(
|
|
partial(
|
|
aio_open_interest_feed_relay,
|
|
fh,
|
|
instruments,
|
|
oi_by_strikes,
|
|
)
|
|
) as (first, chan)
|
|
):
|
|
yield chan
|
|
|
|
|
|
@acm
|
|
async def maybe_open_oi_feed(
|
|
) -> trio.abc.ReceiveStream:
|
|
|
|
# TODO: add a predicate to maybe_open_context
|
|
feed: to_asyncio.LinkedTaskChannel
|
|
async with maybe_open_context(
|
|
acm_func=open_oi_feed,
|
|
) as (cache_hit, feed):
|
|
if cache_hit:
|
|
yield broadcast_receiver(feed, 10)
|
|
else:
|
|
yield feed
|
|
|
|
|
|
# TODO, move all to `.broker` submod!
|
|
# async def aio_order_feed_relay(
|
|
# fh: FeedHandler,
|
|
# instrument: Symbol,
|
|
# from_trio: asyncio.Queue,
|
|
# to_trio: trio.abc.SendChannel,
|
|
# ) -> None:
|
|
# async def _fill(data: dict, receipt_timestamp):
|
|
# breakpoint()
|
|
|
|
# async def _order_info(data: dict, receipt_timestamp):
|
|
# breakpoint()
|
|
|
|
# fh.add_feed(
|
|
# DERIBIT,
|
|
# channels=[FILLS, ORDER_INFO],
|
|
# symbols=[instrument.upper()],
|
|
# callbacks={
|
|
# FILLS: _fill,
|
|
# ORDER_INFO: _order_info,
|
|
# })
|
|
|
|
# if not fh.running:
|
|
# fh.run(
|
|
# start_loop=False,
|
|
# install_signal_handlers=False)
|
|
|
|
# # sync with trio
|
|
# to_trio.send_nowait(None)
|
|
|
|
# await asyncio.sleep(float('inf'))
|
|
|
|
|
|
# @acm
|
|
# async def open_order_feed(
|
|
# instrument: list[str]
|
|
# ) -> trio.abc.ReceiveStream:
|
|
# async with maybe_open_feed_handler() as fh:
|
|
# async with to_asyncio.open_channel_from(
|
|
# partial(
|
|
# aio_order_feed_relay,
|
|
# fh,
|
|
# instrument
|
|
# )
|
|
# ) as (first, chan):
|
|
# yield chan
|
|
|
|
|
|
# @acm
|
|
# async def maybe_open_order_feed(
|
|
# instrument: str
|
|
# ) -> trio.abc.ReceiveStream:
|
|
|
|
# # TODO: add a predicate to maybe_open_context
|
|
# async with maybe_open_context(
|
|
# acm_func=open_order_feed,
|
|
# kwargs={
|
|
# 'instrument': instrument.split('.')[0],
|
|
# 'fh': fh
|
|
# },
|
|
# key=f'{instrument.split('.')[0]}-order',
|
|
# ) as (cache_hit, feed):
|
|
# if cache_hit:
|
|
# yield broadcast_receiver(feed, 10)
|
|
# else:
|
|
# yield feed
|