Initial deribit mock up

deribit
Guillermo Rodriguez 2022-06-01 10:22:02 -03:00
parent 10ea242143
commit 7acc4e3208
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
1 changed files with 407 additions and 0 deletions

View File

@ -0,0 +1,407 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Deribit backend
"""
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
Any, Union, Optional,
AsyncGenerator, Callable,
)
import time
import trio
from trio_typing import TaskStatus
import pendulum
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs
from cryptofeed import FeedHandler
from cryptofeed.callback import (
L1BookCallback,
TradeCallback
)
from cryptofeed.defines import DERIBIT, L1_BOOK, TRADES
log = get_logger(__name__)
_url = 'https://www.deribit.com'
# Broker specific ohlc schema (rest)
_ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', float),
('bar_wap', float), # will be zeroed by sampler if not filled
]
class KLinesResult(BaseModel):
close: List[float]
cost: List[float]
high: List[float]
low: List[float]
open: List[float]
status: str
ticks: List[int]
volume: List[float]
class KLines(BaseModel):
id: int
jsonrpc: str = '2.0'
result: KLinesResult
# convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
class Client:
def __init__(self) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._pairs: dict[str, Any] = {}
async def _api(
self,
method: str,
params: dict,
) -> dict[str, Any]:
resp = await self._sesh.get(
path=f'/api/v2/public/{method}',
params=params,
timeout=float('inf')
)
return resproc(resp, log)
async def symbol_info(
self,
instrument: Optional[str] = None,
currency: str = 'btc', # BTC, ETH, SOL, USDC
kind: str = 'option',
expired: bool = False
) -> dict[str, Any]:
'''Get symbol info for the exchange.
'''
# TODO: we can load from our self._pairs cache
# on repeat calls...
# will retrieve all symbols by default
params = {
'currency': currency.to_upper(),
'kind': kind,
'expired': expired
}
resp = await self._api(
'get_instrument', params=params)
if 'result' in resp:
raise SymbolNotFound
results = resp['result']
instruments = {
item['instrument_name']: item for item in results}
if instrument is not None:
return instruments[instrument]
else:
return instruments
async def cache_symbols(
self,
) -> dict:
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = None,
) -> dict[str, Any]:
if self._pairs is not None:
data = self._pairs
else:
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
data,
score_cutoff=50,
)
# repack in dict form
return {item[0]['instrument_name']: item[0]
for item in matches}
async def bars(
self,
instrument: str,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
as_np: bool = True,
) -> dict:
if end_dt is None:
end_dt = pendulum.now('UTC')
if start_dt is None:
start_dt = end_dt.start_of(
'minute').subtract(minutes=limit)
start_time = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
response = await self._api(
'get_tradingview_chart_data',
params={
'instrument_name': instrument.upper(),
'start_timestamp': start_time,
'end_timestamp': end_time,
'resolution': '1'
}
)
klines = KLines(**response)
result = klines.result
new_bars = []
for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i]
]
new_bars.append((i,) + tuple(row))
array = np.array(
[i, ], dtype=_ohlc_dtype) if as_np else klines
return array
@acm
async def get_client() -> Client:
client = Client()
await client.cache_symbols()
yield client
# inside here we are in an asyncio context
async def open_aio_cryptofeed_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
event_consumers: dict[str, trio.Event],
instruments: List[str] = []
) -> None:
async def trade_cb(feed, instrument, data: dict, receipt_timestamp):
to_trio.send_nowait({
'type': 'trade',
instrument: data,
'receipt': receipt_timestamp})
async def l1_book_cb(feed, instrument, data: dict, receipt_timestamp):
to_trio.send_nowait({
'type': 'l1_book',
instrument: data,
'receipt': receipt_timestamp})
fh = FeedHandler()
fh.run(start_loop=False)
fh.add_feed(
DERIBIT,
channels=[TRADES],
symbols=instruments,
callbacks={TRADES: TradeCallback(trade_cb)})
fh.add_feed(
DERIBIT,
channels=[L1_BOOK],
symbols=instruments,
callbacks={L1_BOOK: L1BookCallback(l1_book_cb)})
# sync with trio
to_trio.send_nowait(None)
await from_trio.get()
@acm
async def open_cryptofeeds():
# try:
event_table = {}
async with (
to_asyncio.open_channel_from(
open_aio_cryptofeed_relay,
event_consumers=event_table,
instruments=['BTC-10JUN22-30000-C']
) as (first, chan),
trio.open_nursery() as n,
):
assert first is None
async def relay_events():
async with chan.subscribe() as msg_stream:
async for msg in msg_stream:
print(msg)
n.start_soon(relay_events)
yield None
await chan.send(None)
@acm
async def open_history_client(
instrument: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
array = await client.bars(
instrument,
start_dt=start_dt,
end_dt=end_dt,
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
instrument: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client:
bars = await client.bars(instrument)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym_infos = {}
uid = 0
async with (
open_cached_client('deribit') as client,
send_chan as send_chan,
):
# keep client cached for real-time section
cache = await client.cache_symbols()
breakpoint()
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
async for pattern in stream:
# results = await client.symbol_info(sym=pattern.upper())
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send(
{item[0]['instrument_name']: item[0]
for item in matches}
)