From 1c833e7175f70a2fe56a88a8174167c964039a98 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Wed, 8 Mar 2023 13:32:47 -0300 Subject: [PATCH] Remove cryptofeeds/asyncio from deribit backend Add hook management to open_jsonrpc_session helper --- piker/brokers/deribit/__init__.py | 10 -- piker/brokers/deribit/api.py | 274 ++++++++++-------------------- piker/brokers/deribit/feed.py | 23 +-- piker/data/_web_bs.py | 51 ++++-- 4 files changed, 136 insertions(+), 222 deletions(-) diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index b921ea67..4b71e22c 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -52,13 +52,3 @@ __enable_modules__: list[str] = [ 'feed', # 'broker', ] - -# passed to ``tractor.ActorNursery.start_actor()`` -_spawn_kwargs = { - 'infect_asyncio': True, -} - -# annotation to let backend agnostic code -# know if ``brokerd`` should be spawned with -# ``tractor``'s aio mode. -_infect_asyncio: bool = True diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 62b4b788..59a49a58 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -20,7 +20,6 @@ Deribit backend. ''' from __future__ import annotations import time -import asyncio from contextlib import asynccontextmanager as acm from functools import partial @@ -31,15 +30,6 @@ from typing import ( Callable, ) -from cryptofeed import FeedHandler -from cryptofeed.defines import ( - DERIBIT, - L1_BOOK, - TRADES, - OPTION, - CALL, - PUT, -) import pendulum import trio from trio_typing import TaskStatus @@ -49,8 +39,6 @@ from tractor.trionics import ( broadcast_receiver, maybe_open_context ) -from tractor import to_asyncio -from cryptofeed.symbols import Symbol from piker.data.types import Struct from piker.data._web_bs import ( @@ -59,16 +47,12 @@ from piker.data._web_bs import ( from piker import config from piker.log import get_logger +from piker._cacheables import open_cached_client log = get_logger(__name__) -_spawn_kwargs = { - 'infect_asyncio': True, -} - - _url = 'https://www.deribit.com' _ws_url = 'wss://www.deribit.com/ws/api/v2' _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' @@ -142,70 +126,12 @@ def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) -def str_to_cb_sym(name: str) -> Symbol: - base, strike_price, expiry_date, option_type = name.split('-') - - quote = base - - if option_type == 'put': - option_type = PUT - elif option_type == 'call': - option_type = CALL - else: - raise Exception("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date, - expiry_normalize=False) +def sym_fmt_piker_to_deribit(sym: str) -> str: + return sym.upper() -def piker_sym_to_cb_sym(name: str) -> Symbol: - base, expiry_date, strike_price, option_type = tuple( - name.upper().split('-')) - - quote = base - - if option_type == 'P': - option_type = PUT - elif option_type == 'C': - option_type = CALL - else: - raise Exception("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date.upper()) - - -def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - - # deribit specific - months = [ - 'JAN', 'FEB', 'MAR', - 'APR', 'MAY', 'JUN', - 'JUL', 'AUG', 'SEP', - 'OCT', 'NOV', 'DEC', - ] - - exp = sym.expiry_date - - # YYMDD - # 01234 - year, month, day = ( - exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - - otype = 'C' if sym.option_type == CALL else 'P' - - return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' +def sym_fmt_deribit_to_piker(sym: str): + return sym.lower() def get_config() -> dict[str, Any]: @@ -214,11 +140,6 @@ def get_config() -> dict[str, Any]: section = conf.get('deribit') - # TODO: document why we send this, basically because logging params - # for cryptofeed - conf['log'] = {} - conf['log']['disabled'] = True - if section is None: log.warning(f'No config section found for deribit in {path}') @@ -227,7 +148,13 @@ def get_config() -> dict[str, Any]: class Client: - def __init__(self, json_rpc: Callable) -> None: + def __init__( + self, + json_rpc: Callable, + update_hooks: Callable, + update_types: Callable, + ) -> None: + self._pairs: dict[str, Any] = None config = get_config().get('deribit', {}) @@ -241,6 +168,8 @@ class Client: self._key_secret = None self.json_rpc = json_rpc + self.update_hooks = update_hooks + self.update_types = update_types @property def currencies(self): @@ -287,7 +216,7 @@ class Client: """Place an order """ params = { - 'instrument_name': symbol.upper(), + 'instrument_name': sym_fmt_piker_to_deribit(symbol), 'amount': size, 'type': 'limit', 'price': price, @@ -328,7 +257,7 @@ class Client: results = resp.result instruments = { - item['instrument_name'].lower(): item + sym_fmt_deribit_to_piker(item['instrument_name']): item for item in results } @@ -359,8 +288,10 @@ class Client: limit=limit ) # repack in dict form - return {item[0]['instrument_name'].lower(): item[0] - for item in matches} + return { + sym_fmt_deribit_to_piker(item[0]['instrument_name']): item[0] + for item in matches + } async def bars( self, @@ -387,7 +318,7 @@ class Client: resp = await self.json_rpc( 'public/get_tradingview_chart_data', params={ - 'instrument_name': instrument.upper(), + 'instrument_name': sym_fmt_piker_to_deribit(instrument), 'start_timestamp': start_time, 'end_timestamp': end_time, 'resolution': '1' @@ -420,13 +351,19 @@ class Client: resp = await self.json_rpc( 'public/get_last_trades_by_instrument', params={ - 'instrument_name': instrument, + 'instrument_name': sym_fmt_piker_to_deribit(instrument), 'count': count }) return LastTradesResult(**resp.result) +class JSONRPCSubRequest(Struct): + method: str + params: dict + jsonrpc: str = '2.0' + + @acm async def get_client( is_brokercheck: bool = False @@ -435,11 +372,11 @@ async def get_client( async with ( trio.open_nursery() as n, open_jsonrpc_session( - _testnet_ws_url, + _ws_url, response_type=JSONRPCResult - ) as json_rpc + ) as control_functions ): - client = Client(json_rpc) + client = Client(*control_functions) _refresh_token: Optional[str] = None _access_token: Optional[str] = None @@ -452,7 +389,7 @@ async def get_client( https://docs.deribit.com/?python#authentication-2 """ - renew_time = 10 + renew_time = 240 access_scope = 'trade:read_write' _expiry_time = time.time() got_access = False @@ -482,7 +419,7 @@ async def get_client( 'scope': access_scope } - resp = await json_rpc('public/auth', params) + resp = await client.json_rpc('public/auth', params) result = resp.result _expiry_time = time.time() + result['expires_in'] @@ -509,97 +446,68 @@ async def get_client( n.cancel_scope.cancel() -@acm -async def open_feed_handler(): - fh = FeedHandler(config=get_config()) - yield fh - await to_asyncio.run_task(fh.stop_async) - - -@acm -async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: - async with maybe_open_context( - acm_func=open_feed_handler, - key='feedhandler', - ) as (cache_hit, fh): - yield fh - - -async def aio_price_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _trade(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp - })) - - async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait( - ('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - - {'type': 'bid', - 'price': float(data.bid_price), - 'size': float(data.bid_size)}, - - {'type': 'bsize', - 'price': float(data.bid_price), - 'size': float(data.bid_size)}, - - {'type': 'ask', - 'price': float(data.ask_price), - 'size': float(data.ask_size)}, - - {'type': 'asize', - 'price': float(data.ask_price), - 'size': float(data.ask_size)} - ] - }) - ) - - fh.add_feed( - DERIBIT, - channels=[TRADES, L1_BOOK], - symbols=[piker_sym_to_cb_sym(instrument)], - callbacks={ - TRADES: _trade, - L1_BOOK: _l1 - }) - - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) - - # sync with trio - to_trio.send_nowait(None) - - await asyncio.sleep(float('inf')) - - @acm async def open_price_feed( instrument: str ) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_price_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan + + instrument_db = sym_fmt_piker_to_deribit(instrument) + + trades_chan = f'trades.{instrument_db}.raw' + book_chan = f'book.{instrument_db}.none.1.100ms' + + channels = [trades_chan, book_chan] + + send_chann, recv_chann = trio.open_memory_channel(0) + async def sub_hook(msg): + chan = msg.params['channel'] + data = msg.params['data'] + if chan == trades_chan: + await send_chann.send(( + 'trade', { + 'symbol': instrument, + 'last': data['price'], + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': data['price'], + 'size': data['amount'], + 'broker_ts': data['timestamp'] + }] + } + )) + + elif chan == book_chan: + bid, bsize = data['bids'][0] + ask, asize = data['asks'][0] + await send_chann.send(( + 'l1', { + 'symbol': instrument, + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize} + ]} + )) + + async with open_cached_client('deribit') as client: + + client.update_hooks({ + 'request': sub_hook + }) + client.update_types({ + 'request': JSONRPCSubRequest + }) + + resp = await client.json_rpc( + 'private/subscribe', {'channels': channels}) + + assert resp.result == channels + + log.info(f'Subscribed to {channels}') + + yield recv_chann @acm diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index da5211ce..84e04fa1 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -40,16 +40,9 @@ from piker.brokers._util import ( from .api import ( Client, Trade, - piker_sym_to_cb_sym, - cb_sym_to_deribit_inst, maybe_open_price_feed ) -_spawn_kwargs = { - 'infect_asyncio': True, -} - - log = get_logger(__name__) @@ -107,10 +100,7 @@ async def stream_quotes( sym = symbols[0] - async with ( - open_cached_client('deribit') as client, - send_chan as send_chan - ): + async with open_cached_client('deribit') as client: init_msgs = { # pass back token, and bool, signalling if we're the writer @@ -118,22 +108,19 @@ async def stream_quotes( sym: { 'symbol_info': { 'asset_type': 'option', - 'price_tick_size': 0.0005 + 'price_tick_size': 0.0005, + 'lot_tick_size': 0.1 }, 'shm_write_opts': {'sum_tick_vml': False}, 'fqsn': sym, }, } - nsym = piker_sym_to_cb_sym(sym) + + last_trades = (await client.last_trades(sym, count=1)).trades async with maybe_open_price_feed(sym) as stream: - await client.cache_symbols() - - last_trades = (await client.last_trades( - cb_sym_to_deribit_inst(nsym), count=1)).trades - if len(last_trades) == 0: last_trade = None async for typ, quote in stream: diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 2dd7f4af..41aab6ac 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -187,7 +187,6 @@ over a NoBsWs. ''' - class JSONRPCResult(Struct): id: int jsonrpc: str = '2.0' @@ -202,9 +201,32 @@ async def open_jsonrpc_session( response_type: type = JSONRPCResult, request_type: Optional[type] = None, request_hook: Optional[Callable] = None, - error_hook: Optional[Callable] = None, + error_hook: Optional[Callable] = None ) -> Callable[[str, dict], dict]: + # xor: this two params need to be passed together or not at all + if bool(request_type) ^ bool(request_hook): + raise ValueError( + 'Need to path both a request_type and request_hook') + + hook_table = { + 'request': request_hook, + 'error': error_hook + } + + types_table = { + 'response': response_type, + 'request': request_type + } + + def update_hooks(new_hooks: dict): + nonlocal hook_table + hook_table.update(new_hooks) + + def update_types(new_types: dict): + nonlocal types_table + types_table.update(new_types) + async with ( trio.open_nursery() as n, open_autorecon_ws(url) as ws @@ -257,8 +279,7 @@ async def open_jsonrpc_session( 'result': _, 'id': mid, } if res_entry := rpc_results.get(mid): - - res_entry['result'] = response_type(**msg) + res_entry['result'] = types_table['response'](**msg) res_entry['event'].set() case { @@ -269,24 +290,32 @@ async def open_jsonrpc_session( f'Unexpected ws msg: {json.dumps(msg, indent=4)}' ) + case { + 'error': error, + 'id': mid + } if res_entry := rpc_results.get(mid): + + res_entry['result'] = types_table['response'](**msg) + res_entry['event'].set() + case { 'method': _, 'params': _, }: - log.debug(f'Recieved\n{msg}') - if request_hook: - await request_hook(request_type(**msg)) + log.info(f'Recieved\n{msg}') + if hook_table['request']: + await hook_table['request'](types_table['request'](**msg)) case { - 'error': error + 'error': error, }: log.warning(f'Recieved\n{error}') - if error_hook: - await error_hook(response_type(**msg)) + if hook_table['error']: + await hook_table['error'](types_table['response'](**msg)) case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') n.start_soon(recv_task) - yield json_rpc + yield json_rpc, update_hooks, update_types n.cancel_scope.cancel()