280 lines
7.4 KiB
Python
280 lines
7.4 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
'''
|
|
Deribit backend.
|
|
|
|
'''
|
|
from contextlib import asynccontextmanager as acm
|
|
from datetime import datetime
|
|
from typing import Any, Optional, Callable
|
|
from pprint import pformat
|
|
import time
|
|
|
|
import trio
|
|
from trio_typing import TaskStatus
|
|
from pendulum import (
|
|
from_timestamp,
|
|
now,
|
|
)
|
|
from rapidfuzz import process as fuzzy
|
|
import numpy as np
|
|
import tractor
|
|
|
|
from piker.accounting import (
|
|
MktPair,
|
|
unpack_fqme,
|
|
)
|
|
from piker.brokers import (
|
|
open_cached_client,
|
|
NoData,
|
|
)
|
|
from piker._cacheables import (
|
|
async_lifo_cache,
|
|
)
|
|
from piker.log import get_logger, get_console_log
|
|
from piker.data import ShmArray
|
|
from piker.data.validate import FeedInit
|
|
from piker.brokers._util import (
|
|
BrokerError,
|
|
DataUnavailable,
|
|
)
|
|
|
|
from cryptofeed import FeedHandler
|
|
from cryptofeed.defines import (
|
|
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
|
|
)
|
|
from cryptofeed.symbols import Symbol
|
|
|
|
from .api import (
|
|
Client, Trade,
|
|
get_config,
|
|
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
|
maybe_open_price_feed
|
|
)
|
|
from .venues import (
|
|
Pair,
|
|
OptionPair,
|
|
)
|
|
|
|
_spawn_kwargs = {
|
|
'infect_asyncio': True,
|
|
}
|
|
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
@acm
|
|
async def open_history_client(
|
|
mkt: MktPair,
|
|
) -> tuple[Callable, int]:
|
|
|
|
# TODO implement history getter for the new storage layer.
|
|
async with open_cached_client('deribit') as client:
|
|
|
|
async def get_ohlc(
|
|
timeframe: float,
|
|
end_dt: datetime | None = None,
|
|
start_dt: datetime | None = None,
|
|
|
|
) -> tuple[
|
|
np.ndarray,
|
|
datetime, # start
|
|
datetime, # end
|
|
]:
|
|
if timeframe != 60:
|
|
raise DataUnavailable('Only 1m bars are supported')
|
|
|
|
array: np.ndarray = await client.bars(
|
|
mkt,
|
|
start_dt=start_dt,
|
|
end_dt=end_dt,
|
|
)
|
|
if len(array) == 0:
|
|
raise NoData(
|
|
f'No frame for {start_dt} -> {end_dt}\n'
|
|
)
|
|
|
|
start_dt = from_timestamp(array[0]['time'])
|
|
end_dt = from_timestamp(array[-1]['time'])
|
|
|
|
times = array['time']
|
|
if not times.any():
|
|
raise ValueError(
|
|
'Bad frame with null-times?\n\n'
|
|
f'{times}'
|
|
)
|
|
|
|
if end_dt is None:
|
|
inow: int = round(time.time())
|
|
if (inow - times[-1]) > 60:
|
|
await tractor.pause()
|
|
|
|
return array, start_dt, end_dt
|
|
|
|
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
|
|
|
|
|
@async_lifo_cache()
|
|
async def get_mkt_info(
|
|
fqme: str,
|
|
|
|
) -> tuple[MktPair, Pair] | None:
|
|
|
|
# uppercase since kraken bs_mktid is always upper
|
|
if 'deribit' not in fqme.lower():
|
|
fqme += '.deribit'
|
|
|
|
mkt_mode: str = ''
|
|
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
|
|
|
|
# NOTE: we always upper case all tokens to be consistent with
|
|
# binance's symbology style for pairs, like `BTCUSDT`, but in
|
|
# theory we could also just keep things lower case; as long as
|
|
# we're consistent and the symcache matches whatever this func
|
|
# returns, always!
|
|
expiry: str = expiry.upper()
|
|
venue: str = venue.upper()
|
|
venue_lower: str = venue.lower()
|
|
|
|
mkt_mode: str = 'option'
|
|
|
|
async with open_cached_client(
|
|
'deribit',
|
|
) as client:
|
|
|
|
assets: dict[str, Asset] = await client.get_assets()
|
|
pair_str: str = mkt_ep.lower()
|
|
|
|
pair: Pair = await client.exch_info(
|
|
sym=pair_str,
|
|
)
|
|
mkt_mode = pair.venue
|
|
client.mkt_mode = mkt_mode
|
|
|
|
dst: Asset | None = assets.get(pair.bs_dst_asset)
|
|
src: Asset | None = assets.get(pair.bs_src_asset)
|
|
|
|
mkt = MktPair(
|
|
dst=dst,
|
|
src=src,
|
|
price_tick=pair.price_tick,
|
|
size_tick=pair.size_tick,
|
|
bs_mktid=pair.symbol,
|
|
expiry=pair.expiry,
|
|
venue=mkt_mode,
|
|
broker='deribit',
|
|
_atype=mkt_mode,
|
|
_fqme_without_src=True,
|
|
)
|
|
return mkt, pair
|
|
|
|
|
|
async def stream_quotes(
|
|
|
|
send_chan: trio.abc.SendChannel,
|
|
symbols: list[str],
|
|
feed_is_live: trio.Event,
|
|
loglevel: str = None,
|
|
|
|
# startup sync
|
|
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
|
|
|
) -> None:
|
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
|
get_console_log(loglevel or tractor.current_actor().loglevel)
|
|
|
|
sym = symbols[0].split('.')[0]
|
|
|
|
init_msgs: list[FeedInit] = []
|
|
|
|
async with (
|
|
open_cached_client('deribit') as client,
|
|
send_chan as send_chan
|
|
):
|
|
|
|
mkt, pair = await get_mkt_info(sym)
|
|
|
|
# build out init msgs according to latest spec
|
|
init_msgs.append(
|
|
FeedInit(mkt_info=mkt)
|
|
)
|
|
nsym = piker_sym_to_cb_sym(sym)
|
|
|
|
async with maybe_open_price_feed(sym) as stream:
|
|
|
|
cache = client._pairs
|
|
|
|
last_trades = (await client.last_trades(
|
|
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
|
|
|
if len(last_trades) == 0:
|
|
last_trade = None
|
|
async for typ, quote in stream:
|
|
if typ == 'trade':
|
|
last_trade = Trade(**(quote['data']))
|
|
break
|
|
|
|
else:
|
|
last_trade = Trade(**(last_trades[0]))
|
|
|
|
first_quote = {
|
|
'symbol': sym,
|
|
'last': last_trade.price,
|
|
'brokerd_ts': last_trade.timestamp,
|
|
'ticks': [{
|
|
'type': 'trade',
|
|
'price': last_trade.price,
|
|
'size': last_trade.amount,
|
|
'broker_ts': last_trade.timestamp
|
|
}]
|
|
}
|
|
task_status.started((init_msgs, first_quote))
|
|
|
|
feed_is_live.set()
|
|
|
|
async for typ, quote in stream:
|
|
topic = quote['symbol']
|
|
await send_chan.send({topic: quote})
|
|
|
|
|
|
@tractor.context
|
|
async def open_symbol_search(
|
|
ctx: tractor.Context,
|
|
) -> Client:
|
|
async with open_cached_client('deribit') as client:
|
|
|
|
# load all symbols locally for fast search
|
|
cache = client._pairs
|
|
await ctx.started()
|
|
|
|
async with ctx.open_stream() as stream:
|
|
|
|
pattern: str
|
|
async for pattern in stream:
|
|
# NOTE: pattern fuzzy-matching is done within
|
|
# the methd impl.
|
|
pairs: dict[str, Pair] = await client.search_symbols(
|
|
pattern,
|
|
)
|
|
# repack in fqme-keyed table
|
|
byfqme: dict[str, Pair] = {}
|
|
for pair in pairs.values():
|
|
byfqme[pair.bs_fqme] = pair
|
|
|
|
await stream.send(byfqme)
|