From 92090b01b8795e7fb9a91f9cc9e696b10ea3c372 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 21 Aug 2022 23:01:03 -0300 Subject: [PATCH] Begin jsonrpc over ws refactor --- piker/brokers/deribit/api.py | 180 +++++++++++++++++++++++++++++----- piker/brokers/deribit/feed.py | 27 +---- piker/data/_web_bs.py | 20 ++-- 3 files changed, 174 insertions(+), 53 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index f8c31bda..3b1b048a 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -1,3 +1,6 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -15,17 +18,22 @@ Deribit backend. ''' +import time -from contextlib import asynccontextmanager as acm +from contextlib import asynccontextmanager as acm, AsyncExitStack +from itertools import count from datetime import datetime -from typing import Any, Optional, List +from typing import Any, List, Dict, Optional, Iterable import pendulum import asks +import trio +from trio_typing import Nursery, TaskStatus from fuzzywuzzy import process as fuzzy import numpy as np from piker.data.types import Struct +from piker.data._web_bs import NoBsWs, open_autorecon_ws from .._util import resproc @@ -38,6 +46,7 @@ log = get_logger(__name__) _url = 'https://www.deribit.com' +_ws_url = 'wss://www.deribit.com/ws/api/v2' # Broker specific ohlc schema (rest) @@ -54,6 +63,15 @@ _ohlc_dtype = [ class JSONRPCResult(Struct): + jsonrpc: str = '2.0' + id: int + result: dict + usIn: int + usOut: int + usDiff: int + testnet: bool + +class JSONRPCHTTPResult(Struct): jsonrpc: str = '2.0' result: dict usIn: int @@ -95,13 +113,131 @@ def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) +def get_config() -> dict[str, Any]: + + conf, path = config.load() + + section = conf.get('deribit') + + if section is None: + log.warning(f'No config section found for deribit in {path}') + return {} + + conf['log'] = {} + conf['log']['disabled'] = True + + return conf + + class Client: - def __init__(self) -> None: + def __init__(self, n: Nursery, ws: NoBsWs) -> None: self._sesh = asks.Session(connections=4) self._sesh.base_location = _url self._pairs: dict[str, Any] = {} + config = get_config()['deribit'] + self._key_id = config['key_id'] + self._key_secret = config['key_secret'] + + self._ws = ws + self._n = n + + self._rpc_id: Iterable = count(0) + self._rpc_results: Dict[int, Dict] = {} + + self._expiry_time: int = float('inf') + self._access_token: Optional[str] = None + self._refresh_token: Optional[str] = None + + def _next_json_body(self, method: str, params: Dict): + return { + 'jsonrpc': '2.0', + 'id': next(self._rpc_id), + 'method': method, + 'params': params + } + + async def start_rpc(self): + self._n.start_soon(self._recv_task) + await self._n.start(self._auth_loop) + + async def _recv_task(self): + while True: + msg = JSONRPCResult(**(await self._ws.recv_msg())) + + if msg.id not in self._rpc_results: + self._rpc_results[msg.id] = { + 'result': None, + 'event': trio.Event() + } + + self._rpc_results[msg.id]['result'] = msg + self._rpc_results[msg.id]['event'].set() + + async def json_rpc(self, method: str, params: Dict) -> Dict: + msg = self._next_json_body(method, params) + _id = msg['id'] + + self._rpc_results[_id] = { + 'result': None, + 'event': trio.Event() + } + + await self._ws.send_msg(msg) + + await self._rpc_results[_id]['event'].wait() + + ret = self._rpc_results[_id]['result'] + + del self._rpc_results[_id] + + return ret + + async def _auth_loop( + self, + task_status: TaskStatus = trio.TASK_STATUS_IGNORED + ): + '''https://docs.deribit.com/?python#authentication-2 + ''' + renew_time = 10 + access_scope = 'trade:read_write' + self._expiry_time = time.time() + got_access = False + + while True: + if time.time() - self._expiry_time < renew_time: + if self._refresh_token != None: + params = { + 'grant_type': 'refresh_token', + 'refresh_token': self._refresh_token, + 'scope': access_scope + } + + else: + params = { + 'grant_type': 'client_credentials', + 'client_id': self._key_id, + 'client_secret': self._key_secret, + 'scope': access_scope + } + + resp = await self.json_rpc('public/auth', params) + result = resp.result + + self._expiry_time = time.time() + result['expires_in'] + self._refresh_token = result['refresh_token'] + + if 'access_token' in result: + self._access_token = result['access_token'] + + if not got_access: + got_access = True + task_status.started() + + else: + await trio.sleep(renew_time / 2) + async def _api( self, method: str, @@ -134,10 +270,8 @@ class Client: 'expired': str(expired).lower() } - resp = await self._api( - 'get_instruments', params=params) - - results = resp['result'] + resp = await self.json_rpc('public/get_instruments', params) + results = resp.result instruments = { item['instrument_name']: item for item in results} @@ -195,19 +329,16 @@ class Client: end_time = deribit_timestamp(end_dt) # https://docs.deribit.com/#public-get_tradingview_chart_data - response = await self._api( - 'get_tradingview_chart_data', + resp = await self.json_rpc( + 'public/get_tradingview_chart_data', params={ 'instrument_name': instrument.upper(), 'start_timestamp': start_time, 'end_timestamp': end_time, 'resolution': '1' - } - ) + }) - klines = JSONRPCResult(**response) - - result = KLinesResult(**klines.result) + result = KLinesResult(**resp.result) new_bars = [] for i in range(len(result.close)): @@ -237,19 +368,24 @@ class Client: instrument: str, count: int = 10 ): - response = await self._api( - 'get_last_trades_by_instrument', + resp = await self.json_rpc( + 'public/get_last_trades_by_instrument', params={ 'instrument_name': instrument, 'count': count - } - ) + }) - return LastTradesResult(**(JSONRPCResult(**response).result)) + return LastTradesResult(**resp.result) @acm async def get_client() -> Client: - client = Client() - await client.cache_symbols() - yield client + async with ( + trio.open_nursery() as n, + open_autorecon_ws(_ws_url) as ws + ): + + client = Client(n, ws) + await client.start_rpc() + await client.cache_symbols() + yield client diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 59e2571f..ae4d3bfc 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -1,3 +1,6 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -31,7 +34,6 @@ import numpy as np import tractor from tractor import to_asyncio -from piker import config from piker._cacheables import open_cached_client from piker.log import get_logger, get_console_log from piker.data import ShmArray @@ -47,32 +49,13 @@ from cryptofeed.defines import ( ) from cryptofeed.symbols import Symbol -from .api import Client, Trade +from .api import Client, Trade, get_config _spawn_kwargs = { 'infect_asyncio': True, } -def get_config() -> dict[str, Any]: - - conf, path = config.load() - - section = conf.get('deribit') - - if section is None: - log.warning(f'No config section found for deribit in {path}') - return {} - - conf['log'] = {} - conf['log']['disabled'] = True - -# conf['log']['filename'] = '/tmp/feedhandler.log' -# conf['log']['level'] = 'WARNING' - - return conf - - log = get_logger(__name__) @@ -128,7 +111,7 @@ def cb_sym_to_deribit_inst(sym: Symbol): # deribit specific months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] - + exp = sym.expiry_date # YYMDD diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 9c3fa796..64d447df 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. """ from contextlib import asynccontextmanager, AsyncExitStack from types import ModuleType -from typing import Any, Callable, AsyncGenerator +from typing import Any, Optional, Callable, AsyncGenerator import json import trio @@ -54,8 +54,8 @@ class NoBsWs: self, url: str, stack: AsyncExitStack, - fixture: Callable, - serializer: ModuleType = json, + fixture: Optional[Callable] = None, + serializer: ModuleType = json ): self.url = url self.fixture = fixture @@ -80,12 +80,14 @@ class NoBsWs: self._ws = await self._stack.enter_async_context( trio_websocket.open_websocket_url(self.url) ) - # rerun user code fixture - ret = await self._stack.enter_async_context( - self.fixture(self) - ) - assert ret is None + if self.fixture is not None: + # rerun user code fixture + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + + assert ret is None log.info(f'Connection success: {self.url}') return self._ws @@ -127,7 +129,7 @@ async def open_autorecon_ws( url: str, # TODO: proper type annot smh - fixture: Callable, + fixture: Optional[Callable] = None, ) -> AsyncGenerator[tuple[...], NoBsWs]: """Apparently we can QoS for all sorts of reasons..so catch em.