From 2c2e43d8ac48059373aba934f57031dbe38df09d Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 22 Aug 2022 11:54:17 -0300 Subject: [PATCH] Add comments and update cryptofeed fork url in requirements --- piker/brokers/deribit/api.py | 48 +++++++++++++++++++++++++----------- requirements.txt | 2 +- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index b0ff8c5d..a0d67f71 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -18,6 +18,7 @@ Deribit backend. ''' +import json import time from contextlib import asynccontextmanager as acm, AsyncExitStack @@ -47,6 +48,7 @@ log = get_logger(__name__) _url = 'https://www.deribit.com' _ws_url = 'wss://www.deribit.com/ws/api/v2' +_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' # Broker specific ohlc schema (rest) @@ -65,15 +67,8 @@ _ohlc_dtype = [ class JSONRPCResult(Struct): jsonrpc: str = '2.0' id: int - result: dict - usIn: int - usOut: int - usDiff: int - testnet: bool - -class JSONRPCHTTPResult(Struct): - jsonrpc: str = '2.0' - result: dict + result: Optional[dict] = None + error: Optional[dict] = None usIn: int usOut: int usDiff: int @@ -154,6 +149,8 @@ class Client: self._refresh_token: Optional[str] = None def _next_json_body(self, method: str, params: Dict): + """get the typical json rpc 2.0 msg body and increment the req id + """ return { 'jsonrpc': '2.0', 'id': next(self._rpc_id), @@ -162,16 +159,23 @@ class Client: } async def start_rpc(self): + """launch message receiver + """ self._n.start_soon(self._recv_task) + # if we have client creds launch auth loop if self._key_id is not None: await self._n.start(self._auth_loop) async def _recv_task(self): + """receives every ws message and stores it in its corresponding result + field, then sets the event to wakeup original sender tasks. + """ while True: msg = JSONRPCResult(**(await self._ws.recv_msg())) if msg.id not in self._rpc_results: + # in case this message wasn't beign accounted for store it self._rpc_results[msg.id] = { 'result': None, 'event': trio.Event() @@ -181,6 +185,9 @@ class Client: self._rpc_results[msg.id]['event'].set() async def json_rpc(self, method: str, params: Dict) -> Dict: + """perform a json rpc call and wait for the result, raise exception in + case of error field present on response + """ msg = self._next_json_body(method, params) _id = msg['id'] @@ -197,14 +204,20 @@ class Client: del self._rpc_results[_id] + if ret.error is not None: + raise Exception(json.dumps(ret.error, indent=4)) + return ret async def _auth_loop( self, task_status: TaskStatus = trio.TASK_STATUS_IGNORED ): - '''https://docs.deribit.com/?python#authentication-2 - ''' + """Background task that adquires a first access token and then will + refresh the access token while the nursery isn't cancelled. + + https://docs.deribit.com/?python#authentication-2 + """ renew_time = 10 access_scope = 'trade:read_write' self._expiry_time = time.time() @@ -212,7 +225,11 @@ class Client: while True: if time.time() - self._expiry_time < renew_time: + # if we are close to token expiry time + if self._refresh_token != None: + # if we have a refresh token already dont need to send + # secret params = { 'grant_type': 'refresh_token', 'refresh_token': self._refresh_token, @@ -220,6 +237,7 @@ class Client: } else: + # we don't have refresh token, send secret to initialize params = { 'grant_type': 'client_credentials', 'client_id': self._key_id, @@ -237,6 +255,8 @@ class Client: self._access_token = result['access_token'] if not got_access: + # first time this loop runs we must indicate task is + # started, we have auth got_access = True task_status.started() @@ -250,9 +270,9 @@ class Client: kind: str = 'option', expired: bool = False ) -> dict[str, Any]: - '''Get symbol info for the exchange. + """Get symbol info for the exchange. - ''' + """ # TODO: we can load from our self._pairs cache # on repeat calls... @@ -375,7 +395,7 @@ class Client: async def get_client() -> Client: async with ( trio.open_nursery() as n, - open_autorecon_ws(_ws_url) as ws + open_autorecon_ws(_testnet_ws_url) as ws ): client = Client(n, ws) diff --git a/requirements.txt b/requirements.txt index cf3801d4..91bb8918 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,4 +20,4 @@ -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc # ``cryptofeed`` for connecting to various crypto exchanges + custom fixes --e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed +-e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed