Add comments and update cryptofeed fork url in requirements

deribit
Guillermo Rodriguez 2022-08-22 11:54:17 -03:00
parent 5872095b09
commit b20500c0d9
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
2 changed files with 35 additions and 15 deletions

View File

@ -18,6 +18,7 @@
Deribit backend. Deribit backend.
''' '''
import json
import time import time
from contextlib import asynccontextmanager as acm, AsyncExitStack from contextlib import asynccontextmanager as acm, AsyncExitStack
@ -47,6 +48,7 @@ log = get_logger(__name__)
_url = 'https://www.deribit.com' _url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2' _ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
# Broker specific ohlc schema (rest) # Broker specific ohlc schema (rest)
@ -65,15 +67,8 @@ _ohlc_dtype = [
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
jsonrpc: str = '2.0' jsonrpc: str = '2.0'
id: int id: int
result: dict result: Optional[dict] = None
usIn: int error: Optional[dict] = None
usOut: int
usDiff: int
testnet: bool
class JSONRPCHTTPResult(Struct):
jsonrpc: str = '2.0'
result: dict
usIn: int usIn: int
usOut: int usOut: int
usDiff: int usDiff: int
@ -154,6 +149,8 @@ class Client:
self._refresh_token: Optional[str] = None self._refresh_token: Optional[str] = None
def _next_json_body(self, method: str, params: Dict): def _next_json_body(self, method: str, params: Dict):
"""get the typical json rpc 2.0 msg body and increment the req id
"""
return { return {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': next(self._rpc_id), 'id': next(self._rpc_id),
@ -162,16 +159,23 @@ class Client:
} }
async def start_rpc(self): async def start_rpc(self):
"""launch message receiver
"""
self._n.start_soon(self._recv_task) self._n.start_soon(self._recv_task)
# if we have client creds launch auth loop
if self._key_id is not None: if self._key_id is not None:
await self._n.start(self._auth_loop) await self._n.start(self._auth_loop)
async def _recv_task(self): async def _recv_task(self):
"""receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks.
"""
while True: while True:
msg = JSONRPCResult(**(await self._ws.recv_msg())) msg = JSONRPCResult(**(await self._ws.recv_msg()))
if msg.id not in self._rpc_results: if msg.id not in self._rpc_results:
# in case this message wasn't beign accounted for store it
self._rpc_results[msg.id] = { self._rpc_results[msg.id] = {
'result': None, 'result': None,
'event': trio.Event() 'event': trio.Event()
@ -181,6 +185,9 @@ class Client:
self._rpc_results[msg.id]['event'].set() self._rpc_results[msg.id]['event'].set()
async def json_rpc(self, method: str, params: Dict) -> Dict: async def json_rpc(self, method: str, params: Dict) -> Dict:
"""perform a json rpc call and wait for the result, raise exception in
case of error field present on response
"""
msg = self._next_json_body(method, params) msg = self._next_json_body(method, params)
_id = msg['id'] _id = msg['id']
@ -197,14 +204,20 @@ class Client:
del self._rpc_results[_id] del self._rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret return ret
async def _auth_loop( async def _auth_loop(
self, self,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED task_status: TaskStatus = trio.TASK_STATUS_IGNORED
): ):
'''https://docs.deribit.com/?python#authentication-2 """Background task that adquires a first access token and then will
''' refresh the access token while the nursery isn't cancelled.
https://docs.deribit.com/?python#authentication-2
"""
renew_time = 10 renew_time = 10
access_scope = 'trade:read_write' access_scope = 'trade:read_write'
self._expiry_time = time.time() self._expiry_time = time.time()
@ -212,7 +225,11 @@ class Client:
while True: while True:
if time.time() - self._expiry_time < renew_time: if time.time() - self._expiry_time < renew_time:
# if we are close to token expiry time
if self._refresh_token != None: if self._refresh_token != None:
# if we have a refresh token already dont need to send
# secret
params = { params = {
'grant_type': 'refresh_token', 'grant_type': 'refresh_token',
'refresh_token': self._refresh_token, 'refresh_token': self._refresh_token,
@ -220,6 +237,7 @@ class Client:
} }
else: else:
# we don't have refresh token, send secret to initialize
params = { params = {
'grant_type': 'client_credentials', 'grant_type': 'client_credentials',
'client_id': self._key_id, 'client_id': self._key_id,
@ -237,6 +255,8 @@ class Client:
self._access_token = result['access_token'] self._access_token = result['access_token']
if not got_access: if not got_access:
# first time this loop runs we must indicate task is
# started, we have auth
got_access = True got_access = True
task_status.started() task_status.started()
@ -250,9 +270,9 @@ class Client:
kind: str = 'option', kind: str = 'option',
expired: bool = False expired: bool = False
) -> dict[str, Any]: ) -> dict[str, Any]:
'''Get symbol info for the exchange. """Get symbol info for the exchange.
''' """
# TODO: we can load from our self._pairs cache # TODO: we can load from our self._pairs cache
# on repeat calls... # on repeat calls...
@ -375,7 +395,7 @@ class Client:
async def get_client() -> Client: async def get_client() -> Client:
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(_ws_url) as ws open_autorecon_ws(_testnet_ws_url) as ws
): ):
client = Client(n, ws) client = Client(n, ws)

View File

@ -20,4 +20,4 @@
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes # ``cryptofeed`` for connecting to various crypto exchanges + custom fixes
-e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed -e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed