Add comments and update cryptofeed fork url in requirements
parent
212b3d620d
commit
2c2e43d8ac
|
@ -18,6 +18,7 @@
|
||||||
Deribit backend.
|
Deribit backend.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
import json
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from contextlib import asynccontextmanager as acm, AsyncExitStack
|
from contextlib import asynccontextmanager as acm, AsyncExitStack
|
||||||
|
@ -47,6 +48,7 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
_url = 'https://www.deribit.com'
|
_url = 'https://www.deribit.com'
|
||||||
_ws_url = 'wss://www.deribit.com/ws/api/v2'
|
_ws_url = 'wss://www.deribit.com/ws/api/v2'
|
||||||
|
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
||||||
|
|
||||||
|
|
||||||
# Broker specific ohlc schema (rest)
|
# Broker specific ohlc schema (rest)
|
||||||
|
@ -65,15 +67,8 @@ _ohlc_dtype = [
|
||||||
class JSONRPCResult(Struct):
|
class JSONRPCResult(Struct):
|
||||||
jsonrpc: str = '2.0'
|
jsonrpc: str = '2.0'
|
||||||
id: int
|
id: int
|
||||||
result: dict
|
result: Optional[dict] = None
|
||||||
usIn: int
|
error: Optional[dict] = None
|
||||||
usOut: int
|
|
||||||
usDiff: int
|
|
||||||
testnet: bool
|
|
||||||
|
|
||||||
class JSONRPCHTTPResult(Struct):
|
|
||||||
jsonrpc: str = '2.0'
|
|
||||||
result: dict
|
|
||||||
usIn: int
|
usIn: int
|
||||||
usOut: int
|
usOut: int
|
||||||
usDiff: int
|
usDiff: int
|
||||||
|
@ -154,6 +149,8 @@ class Client:
|
||||||
self._refresh_token: Optional[str] = None
|
self._refresh_token: Optional[str] = None
|
||||||
|
|
||||||
def _next_json_body(self, method: str, params: Dict):
|
def _next_json_body(self, method: str, params: Dict):
|
||||||
|
"""get the typical json rpc 2.0 msg body and increment the req id
|
||||||
|
"""
|
||||||
return {
|
return {
|
||||||
'jsonrpc': '2.0',
|
'jsonrpc': '2.0',
|
||||||
'id': next(self._rpc_id),
|
'id': next(self._rpc_id),
|
||||||
|
@ -162,16 +159,23 @@ class Client:
|
||||||
}
|
}
|
||||||
|
|
||||||
async def start_rpc(self):
|
async def start_rpc(self):
|
||||||
|
"""launch message receiver
|
||||||
|
"""
|
||||||
self._n.start_soon(self._recv_task)
|
self._n.start_soon(self._recv_task)
|
||||||
|
|
||||||
|
# if we have client creds launch auth loop
|
||||||
if self._key_id is not None:
|
if self._key_id is not None:
|
||||||
await self._n.start(self._auth_loop)
|
await self._n.start(self._auth_loop)
|
||||||
|
|
||||||
async def _recv_task(self):
|
async def _recv_task(self):
|
||||||
|
"""receives every ws message and stores it in its corresponding result
|
||||||
|
field, then sets the event to wakeup original sender tasks.
|
||||||
|
"""
|
||||||
while True:
|
while True:
|
||||||
msg = JSONRPCResult(**(await self._ws.recv_msg()))
|
msg = JSONRPCResult(**(await self._ws.recv_msg()))
|
||||||
|
|
||||||
if msg.id not in self._rpc_results:
|
if msg.id not in self._rpc_results:
|
||||||
|
# in case this message wasn't beign accounted for store it
|
||||||
self._rpc_results[msg.id] = {
|
self._rpc_results[msg.id] = {
|
||||||
'result': None,
|
'result': None,
|
||||||
'event': trio.Event()
|
'event': trio.Event()
|
||||||
|
@ -181,6 +185,9 @@ class Client:
|
||||||
self._rpc_results[msg.id]['event'].set()
|
self._rpc_results[msg.id]['event'].set()
|
||||||
|
|
||||||
async def json_rpc(self, method: str, params: Dict) -> Dict:
|
async def json_rpc(self, method: str, params: Dict) -> Dict:
|
||||||
|
"""perform a json rpc call and wait for the result, raise exception in
|
||||||
|
case of error field present on response
|
||||||
|
"""
|
||||||
msg = self._next_json_body(method, params)
|
msg = self._next_json_body(method, params)
|
||||||
_id = msg['id']
|
_id = msg['id']
|
||||||
|
|
||||||
|
@ -197,14 +204,20 @@ class Client:
|
||||||
|
|
||||||
del self._rpc_results[_id]
|
del self._rpc_results[_id]
|
||||||
|
|
||||||
|
if ret.error is not None:
|
||||||
|
raise Exception(json.dumps(ret.error, indent=4))
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
async def _auth_loop(
|
async def _auth_loop(
|
||||||
self,
|
self,
|
||||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
'''https://docs.deribit.com/?python#authentication-2
|
"""Background task that adquires a first access token and then will
|
||||||
'''
|
refresh the access token while the nursery isn't cancelled.
|
||||||
|
|
||||||
|
https://docs.deribit.com/?python#authentication-2
|
||||||
|
"""
|
||||||
renew_time = 10
|
renew_time = 10
|
||||||
access_scope = 'trade:read_write'
|
access_scope = 'trade:read_write'
|
||||||
self._expiry_time = time.time()
|
self._expiry_time = time.time()
|
||||||
|
@ -212,7 +225,11 @@ class Client:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if time.time() - self._expiry_time < renew_time:
|
if time.time() - self._expiry_time < renew_time:
|
||||||
|
# if we are close to token expiry time
|
||||||
|
|
||||||
if self._refresh_token != None:
|
if self._refresh_token != None:
|
||||||
|
# if we have a refresh token already dont need to send
|
||||||
|
# secret
|
||||||
params = {
|
params = {
|
||||||
'grant_type': 'refresh_token',
|
'grant_type': 'refresh_token',
|
||||||
'refresh_token': self._refresh_token,
|
'refresh_token': self._refresh_token,
|
||||||
|
@ -220,6 +237,7 @@ class Client:
|
||||||
}
|
}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# we don't have refresh token, send secret to initialize
|
||||||
params = {
|
params = {
|
||||||
'grant_type': 'client_credentials',
|
'grant_type': 'client_credentials',
|
||||||
'client_id': self._key_id,
|
'client_id': self._key_id,
|
||||||
|
@ -237,6 +255,8 @@ class Client:
|
||||||
self._access_token = result['access_token']
|
self._access_token = result['access_token']
|
||||||
|
|
||||||
if not got_access:
|
if not got_access:
|
||||||
|
# first time this loop runs we must indicate task is
|
||||||
|
# started, we have auth
|
||||||
got_access = True
|
got_access = True
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
|
@ -250,9 +270,9 @@ class Client:
|
||||||
kind: str = 'option',
|
kind: str = 'option',
|
||||||
expired: bool = False
|
expired: bool = False
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
'''Get symbol info for the exchange.
|
"""Get symbol info for the exchange.
|
||||||
|
|
||||||
'''
|
"""
|
||||||
# TODO: we can load from our self._pairs cache
|
# TODO: we can load from our self._pairs cache
|
||||||
# on repeat calls...
|
# on repeat calls...
|
||||||
|
|
||||||
|
@ -375,7 +395,7 @@ class Client:
|
||||||
async def get_client() -> Client:
|
async def get_client() -> Client:
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
open_autorecon_ws(_ws_url) as ws
|
open_autorecon_ws(_testnet_ws_url) as ws
|
||||||
):
|
):
|
||||||
|
|
||||||
client = Client(n, ws)
|
client = Client(n, ws)
|
||||||
|
|
|
@ -20,4 +20,4 @@
|
||||||
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
|
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
|
||||||
|
|
||||||
# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes
|
# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes
|
||||||
-e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed
|
-e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed
|
||||||
|
|
Loading…
Reference in New Issue