From 28e025d02eb04dd759f9731ab142f4f2b4d8dc92 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 26 Jun 2022 22:38:23 -0300 Subject: [PATCH] Finally get a chart going! lots of fixes to streaming machinery and custom cryptofeed fork with fixes --- piker/brokers/deribit.py | 244 +++++++++++++++++++++++++++------------ requirements.txt | 3 + setup.py | 2 +- 3 files changed, 173 insertions(+), 76 deletions(-) diff --git a/piker/brokers/deribit.py b/piker/brokers/deribit.py index 4c43e4dd..b787d2b5 100644 --- a/piker/brokers/deribit.py +++ b/piker/brokers/deribit.py @@ -19,6 +19,7 @@ Deribit backend """ import asyncio +from async_generator import aclosing from contextlib import asynccontextmanager as acm from datetime import datetime from typing import ( @@ -74,7 +75,7 @@ def get_config() -> dict[str, Any]: return {} conf['log'] = {} - conf['log']['filename'] = 'feedhandler.log' + conf['log']['filename'] = '/tmp/feedhandler.log' conf['log']['level'] = 'WARNING' return conf @@ -95,9 +96,19 @@ _ohlc_dtype = [ ('low', float), ('close', float), ('volume', float), - ('bar_wap', float), # will be zeroed by sampler if not filled + # ('bar_wap', float), # will be zeroed by sampler if not filled ] + +class JSONRPCResult(BaseModel): + jsonrpc: str = '2.0' + result: dict + usIn: int + usOut: int + usDiff: int + testnet: bool + + class KLinesResult(BaseModel): close: List[float] cost: List[float] @@ -108,13 +119,30 @@ class KLinesResult(BaseModel): ticks: List[int] volume: List[float] -class KLines(BaseModel): - jsonrpc: str = '2.0' + +class KLines(JSONRPCResult): result: KLinesResult - usIn: int - usOut: int - usDiff: int - testnet: bool + + +class Trade(BaseModel): + trade_seq: int + trade_id: str + timestamp: int + tick_direction: int + price: float + mark_price: float + iv: float + instrument_name: str + index_price: float + direction: str + amount: float + +class LastTradesResult(BaseModel): + trades: List[Trade] + has_more: bool + +class LastTrades(JSONRPCResult): + result: LastTradesResult # convert datetime obj timestamp to unixtime in milliseconds @@ -122,6 +150,68 @@ def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) +def str_to_cb_sym(name: str) -> Symbol: + base, strike_price, expiry_date, option_type = name.split('-') + + quote = base + + if option_type == 'put': + option_type = PUT + elif option_type == 'call': + option_type = CALL + else: + raise BaseException("Couldn\'t parse option type") + + return Symbol( + base, quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date, + expiry_normalize=False) + + + +def piker_sym_to_cb_sym(name: str) -> Symbol: + base, expiry_date, strike_price, option_type = tuple( + name.upper().split('-')) + + quote = base + + if option_type == 'P': + option_type = PUT + elif option_type == 'C': + option_type = CALL + else: + raise BaseException("Couldn\'t parse option type") + + return Symbol( + base, quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date.upper()) + + +def cb_sym_to_deribit_inst(sym: Symbol): + # cryptofeed normalized + cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] + + # deribit specific + months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] + + exp = sym.expiry_date + + # YYMDD + # 01234 + year, month, day = ( + exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) + + otype = 'C' if sym.option_type == CALL else 'P' + + return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' + + class Client: def __init__(self) -> None: @@ -255,10 +345,24 @@ class Client: new_bars.append((i,) + tuple(row)) - array = np.array( - [i, ], dtype=_ohlc_dtype) if as_np else klines + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines return array + async def last_trades( + self, + instrument: str, + count: int = 10 + ): + response = await self._api( + 'get_last_trades_by_instrument', + params={ + 'instrument_name': instrument, + 'count': count + } + ) + + return LastTrades(**response) + @acm async def get_client() -> Client: @@ -274,42 +378,22 @@ async def open_aio_cryptofeed_relay( instruments: List[str] = [] ) -> None: - conf = get_config() - - def format_sym(name: str) -> str: - base, expiry_date, strike_price, option_type = tuple( - name.upper().split('-')) - - quote = base - - if option_type == 'P': - option_type = PUT - elif option_type == 'C': - option_type = CALL - else: - raise BaseException("Instrument name must end in 'c' for calls or 'p' for puts") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date.upper()).normalized - - instruments = [format_sym(i) for i in instruments] + instruments = [piker_sym_to_cb_sym(i) for i in instruments] async def trade_cb(data: dict, receipt_timestamp): - breakpoint() - # to_trio.send_nowait(('trade', { - # 'symbol': data.symbol.lower(), - # 'last': data. - # 'broker_ts': time.time(), - # 'data': data.to_dict(), - # 'receipt': receipt_timestamp})) + to_trio.send_nowait(('trade', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'last': data, + 'broker_ts': time.time(), + 'data': data.to_dict(), + 'receipt': receipt_timestamp + })) async def l1_book_cb(data: dict, receipt_timestamp): to_trio.send_nowait(('l1', { - 'symbol': data.symbol.lower(), + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), 'ticks': [ {'type': 'bid', 'price': float(data.bid_price), 'size': float(data.bid_size)}, @@ -319,45 +403,42 @@ async def open_aio_cryptofeed_relay( 'price': float(data.ask_price), 'size': float(data.ask_size)}, {'type': 'asize', 'price': float(data.ask_price), 'size': float(data.ask_size)} - ]})) + ] + })) - fh = FeedHandler(config=conf) + fh = FeedHandler(config=get_config()) fh.run(start_loop=False) fh.add_feed( DERIBIT, - channels=[L1_BOOK, TRADES], + channels=[L1_BOOK], symbols=instruments, - callbacks={ - L1_BOOK: L1BookCallback(l1_book_cb), - TRADES: TradeCallback(trade_cb) - }) + callbacks={L1_BOOK: l1_book_cb}) + + fh.add_feed( + DERIBIT, + channels=[TRADES], + symbols=instruments, + callbacks={TRADES: trade_cb}) # sync with trio to_trio.send_nowait(None) - await from_trio.get() + await asyncio.sleep(float('inf')) +@acm async def open_cryptofeeds( - instruments: List[str], - to_chart: trio.abc.SendChannel, - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, -): + instruments: List[str] + +) -> trio.abc.ReceiveStream: + async with to_asyncio.open_channel_from( open_aio_cryptofeed_relay, instruments=instruments, ) as (first, chan): - assert first is None - - await chan.send(None) - - async with chan.subscribe() as msg_stream: - task_status.started() - async for msg in msg_stream: - await to_chart.send(msg) + yield chan @acm @@ -420,40 +501,53 @@ async def stream_quotes( get_console_log(loglevel or tractor.current_actor().loglevel) sym = symbols[0] - to_chart, from_feed = trio.open_memory_channel(1) async with ( open_cached_client('deribit') as client, send_chan as send_chan, - trio.open_nursery() as n + trio.open_nursery() as n, + open_cryptofeeds(symbols) as stream ): - await n.start( - open_cryptofeeds, symbols, to_chart) init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written sym: { - 'symbol_info': {}, + 'symbol_info': { + 'asset_type': 'option' + }, 'shm_write_opts': {'sum_tick_vml': False}, 'fqsn': sym, }, } + nsym = piker_sym_to_cb_sym(sym) + # keep client cached for real-time section cache = await client.cache_symbols() - async with from_feed: - typ, quote = await anext(from_feed) + last_trade = (await client.last_trades( + cb_sym_to_deribit_inst(nsym), count=1)).result.trades[0] - while typ != 'trade': - typ, quote = await anext(from_feed) + first_quote = { + 'symbol': sym, + 'last': last_trade.price, + 'brokerd_ts': last_trade.timestamp, + 'ticks': [{ + 'type': 'trade', + 'price': last_trade.price, + 'size': last_trade.amount, + 'broker_ts': last_trade.timestamp + }] + } + task_status.started((init_msgs, first_quote)) - task_status.started((init_msgs, quote)) + async with aclosing(stream): + feed_is_live.set() - async for typ, msg in from_feed: - topic = msg['symbol'] - await send_chan.send({topic: msg}) + async for typ, quote in stream: + topic = quote['symbol'] + await send_chan.send({topic: quote}) @tractor.context diff --git a/requirements.txt b/requirements.txt index 93d2aaa2..cf3801d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,6 @@ # ``asyncvnc`` for sending interactions to ib-gw inside docker -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc + +# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes +-e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed diff --git a/setup.py b/setup.py index cee70bfa..44d360fa 100755 --- a/setup.py +++ b/setup.py @@ -58,11 +58,11 @@ setup( # 'trimeter', # not released yet.. # 'tractor', # asyncvnc, + # 'cryptofeed', # brokers 'asks==2.4.8', 'ib_insync', - 'cryptofeed', # numerics 'pendulum', # easier datetimes