Finally get a chart going! lots of fixes to streaming machinery and custom cryptofeed fork with fixes

deribit
Guillermo Rodriguez 2022-06-26 22:38:23 -03:00
parent e558e5837e
commit 28e025d02e
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
3 changed files with 173 additions and 76 deletions

View File

@ -19,6 +19,7 @@ Deribit backend
"""
import asyncio
from async_generator import aclosing
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
@ -74,7 +75,7 @@ def get_config() -> dict[str, Any]:
return {}
conf['log'] = {}
conf['log']['filename'] = 'feedhandler.log'
conf['log']['filename'] = '/tmp/feedhandler.log'
conf['log']['level'] = 'WARNING'
return conf
@ -95,9 +96,19 @@ _ohlc_dtype = [
('low', float),
('close', float),
('volume', float),
('bar_wap', float), # will be zeroed by sampler if not filled
# ('bar_wap', float), # will be zeroed by sampler if not filled
]
class JSONRPCResult(BaseModel):
jsonrpc: str = '2.0'
result: dict
usIn: int
usOut: int
usDiff: int
testnet: bool
class KLinesResult(BaseModel):
close: List[float]
cost: List[float]
@ -108,13 +119,30 @@ class KLinesResult(BaseModel):
ticks: List[int]
volume: List[float]
class KLines(BaseModel):
jsonrpc: str = '2.0'
class KLines(JSONRPCResult):
result: KLinesResult
usIn: int
usOut: int
usDiff: int
testnet: bool
class Trade(BaseModel):
trade_seq: int
trade_id: str
timestamp: int
tick_direction: int
price: float
mark_price: float
iv: float
instrument_name: str
index_price: float
direction: str
amount: float
class LastTradesResult(BaseModel):
trades: List[Trade]
has_more: bool
class LastTrades(JSONRPCResult):
result: LastTradesResult
# convert datetime obj timestamp to unixtime in milliseconds
@ -122,6 +150,68 @@ def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise BaseException("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise BaseException("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
class Client:
def __init__(self) -> None:
@ -255,10 +345,24 @@ class Client:
new_bars.append((i,) + tuple(row))
array = np.array(
[i, ], dtype=_ohlc_dtype) if as_np else klines
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines
return array
async def last_trades(
self,
instrument: str,
count: int = 10
):
response = await self._api(
'get_last_trades_by_instrument',
params={
'instrument_name': instrument,
'count': count
}
)
return LastTrades(**response)
@acm
async def get_client() -> Client:
@ -274,42 +378,22 @@ async def open_aio_cryptofeed_relay(
instruments: List[str] = []
) -> None:
conf = get_config()
def format_sym(name: str) -> str:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise BaseException("Instrument name must end in 'c' for calls or 'p' for puts")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper()).normalized
instruments = [format_sym(i) for i in instruments]
instruments = [piker_sym_to_cb_sym(i) for i in instruments]
async def trade_cb(data: dict, receipt_timestamp):
breakpoint()
# to_trio.send_nowait(('trade', {
# 'symbol': data.symbol.lower(),
# 'last': data.
# 'broker_ts': time.time(),
# 'data': data.to_dict(),
# 'receipt': receipt_timestamp}))
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def l1_book_cb(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': data.symbol.lower(),
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
@ -319,45 +403,42 @@ async def open_aio_cryptofeed_relay(
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]}))
]
}))
fh = FeedHandler(config=conf)
fh = FeedHandler(config=get_config())
fh.run(start_loop=False)
fh.add_feed(
DERIBIT,
channels=[L1_BOOK, TRADES],
channels=[L1_BOOK],
symbols=instruments,
callbacks={
L1_BOOK: L1BookCallback(l1_book_cb),
TRADES: TradeCallback(trade_cb)
})
callbacks={L1_BOOK: l1_book_cb})
fh.add_feed(
DERIBIT,
channels=[TRADES],
symbols=instruments,
callbacks={TRADES: trade_cb})
# sync with trio
to_trio.send_nowait(None)
await from_trio.get()
await asyncio.sleep(float('inf'))
@acm
async def open_cryptofeeds(
instruments: List[str],
to_chart: trio.abc.SendChannel,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
):
instruments: List[str]
) -> trio.abc.ReceiveStream:
async with to_asyncio.open_channel_from(
open_aio_cryptofeed_relay,
instruments=instruments,
) as (first, chan):
assert first is None
await chan.send(None)
async with chan.subscribe() as msg_stream:
task_status.started()
async for msg in msg_stream:
await to_chart.send(msg)
yield chan
@acm
@ -420,40 +501,53 @@ async def stream_quotes(
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0]
to_chart, from_feed = trio.open_memory_channel(1)
async with (
open_cached_client('deribit') as client,
send_chan as send_chan,
trio.open_nursery() as n
trio.open_nursery() as n,
open_cryptofeeds(symbols) as stream
):
await n.start(
open_cryptofeeds, symbols, to_chart)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {},
'symbol_info': {
'asset_type': 'option'
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
nsym = piker_sym_to_cb_sym(sym)
# keep client cached for real-time section
cache = await client.cache_symbols()
async with from_feed:
typ, quote = await anext(from_feed)
last_trade = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).result.trades[0]
while typ != 'trade':
typ, quote = await anext(from_feed)
first_quote = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
'ticks': [{
'type': 'trade',
'price': last_trade.price,
'size': last_trade.amount,
'broker_ts': last_trade.timestamp
}]
}
task_status.started((init_msgs, first_quote))
task_status.started((init_msgs, quote))
async with aclosing(stream):
feed_is_live.set()
async for typ, msg in from_feed:
topic = msg['symbol']
await send_chan.send({topic: msg})
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
@tractor.context

View File

@ -18,3 +18,6 @@
# ``asyncvnc`` for sending interactions to ib-gw inside docker
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes
-e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed

View File

@ -58,11 +58,11 @@ setup(
# 'trimeter', # not released yet..
# 'tractor',
# asyncvnc,
# 'cryptofeed',
# brokers
'asks==2.4.8',
'ib_insync',
'cryptofeed',
# numerics
'pendulum', # easier datetimes