Begin jsonrpc over ws refactor

size_in_shm_token
Guillermo Rodriguez 2022-08-21 23:01:03 -03:00
parent 9073fbc317
commit 92090b01b8
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
3 changed files with 174 additions and 53 deletions

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
@ -15,17 +18,22 @@
Deribit backend.
'''
import time
from contextlib import asynccontextmanager as acm
from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count
from datetime import datetime
from typing import Any, Optional, List
from typing import Any, List, Dict, Optional, Iterable
import pendulum
import asks
import trio
from trio_typing import Nursery, TaskStatus
from fuzzywuzzy import process as fuzzy
import numpy as np
from piker.data.types import Struct
from piker.data._web_bs import NoBsWs, open_autorecon_ws
from .._util import resproc
@ -38,6 +46,7 @@ log = get_logger(__name__)
_url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
# Broker specific ohlc schema (rest)
@ -54,6 +63,15 @@ _ohlc_dtype = [
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: dict
usIn: int
usOut: int
usDiff: int
testnet: bool
class JSONRPCHTTPResult(Struct):
jsonrpc: str = '2.0'
result: dict
usIn: int
@ -95,13 +113,131 @@ def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
conf['log'] = {}
conf['log']['disabled'] = True
return conf
class Client:
def __init__(self) -> None:
def __init__(self, n: Nursery, ws: NoBsWs) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._pairs: dict[str, Any] = {}
config = get_config()['deribit']
self._key_id = config['key_id']
self._key_secret = config['key_secret']
self._ws = ws
self._n = n
self._rpc_id: Iterable = count(0)
self._rpc_results: Dict[int, Dict] = {}
self._expiry_time: int = float('inf')
self._access_token: Optional[str] = None
self._refresh_token: Optional[str] = None
def _next_json_body(self, method: str, params: Dict):
return {
'jsonrpc': '2.0',
'id': next(self._rpc_id),
'method': method,
'params': params
}
async def start_rpc(self):
self._n.start_soon(self._recv_task)
await self._n.start(self._auth_loop)
async def _recv_task(self):
while True:
msg = JSONRPCResult(**(await self._ws.recv_msg()))
if msg.id not in self._rpc_results:
self._rpc_results[msg.id] = {
'result': None,
'event': trio.Event()
}
self._rpc_results[msg.id]['result'] = msg
self._rpc_results[msg.id]['event'].set()
async def json_rpc(self, method: str, params: Dict) -> Dict:
msg = self._next_json_body(method, params)
_id = msg['id']
self._rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await self._ws.send_msg(msg)
await self._rpc_results[_id]['event'].wait()
ret = self._rpc_results[_id]['result']
del self._rpc_results[_id]
return ret
async def _auth_loop(
self,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
):
'''https://docs.deribit.com/?python#authentication-2
'''
renew_time = 10
access_scope = 'trade:read_write'
self._expiry_time = time.time()
got_access = False
while True:
if time.time() - self._expiry_time < renew_time:
if self._refresh_token != None:
params = {
'grant_type': 'refresh_token',
'refresh_token': self._refresh_token,
'scope': access_scope
}
else:
params = {
'grant_type': 'client_credentials',
'client_id': self._key_id,
'client_secret': self._key_secret,
'scope': access_scope
}
resp = await self.json_rpc('public/auth', params)
result = resp.result
self._expiry_time = time.time() + result['expires_in']
self._refresh_token = result['refresh_token']
if 'access_token' in result:
self._access_token = result['access_token']
if not got_access:
got_access = True
task_status.started()
else:
await trio.sleep(renew_time / 2)
async def _api(
self,
method: str,
@ -134,10 +270,8 @@ class Client:
'expired': str(expired).lower()
}
resp = await self._api(
'get_instruments', params=params)
results = resp['result']
resp = await self.json_rpc('public/get_instruments', params)
results = resp.result
instruments = {
item['instrument_name']: item for item in results}
@ -195,19 +329,16 @@ class Client:
end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
response = await self._api(
'get_tradingview_chart_data',
resp = await self.json_rpc(
'public/get_tradingview_chart_data',
params={
'instrument_name': instrument.upper(),
'start_timestamp': start_time,
'end_timestamp': end_time,
'resolution': '1'
}
)
})
klines = JSONRPCResult(**response)
result = KLinesResult(**klines.result)
result = KLinesResult(**resp.result)
new_bars = []
for i in range(len(result.close)):
@ -237,19 +368,24 @@ class Client:
instrument: str,
count: int = 10
):
response = await self._api(
'get_last_trades_by_instrument',
resp = await self.json_rpc(
'public/get_last_trades_by_instrument',
params={
'instrument_name': instrument,
'count': count
}
)
})
return LastTradesResult(**(JSONRPCResult(**response).result))
return LastTradesResult(**resp.result)
@acm
async def get_client() -> Client:
client = Client()
await client.cache_symbols()
yield client
async with (
trio.open_nursery() as n,
open_autorecon_ws(_ws_url) as ws
):
client = Client(n, ws)
await client.start_rpc()
await client.cache_symbols()
yield client

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
@ -31,7 +34,6 @@ import numpy as np
import tractor
from tractor import to_asyncio
from piker import config
from piker._cacheables import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
@ -47,32 +49,13 @@ from cryptofeed.defines import (
)
from cryptofeed.symbols import Symbol
from .api import Client, Trade
from .api import Client, Trade, get_config
_spawn_kwargs = {
'infect_asyncio': True,
}
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
conf['log'] = {}
conf['log']['disabled'] = True
# conf['log']['filename'] = '/tmp/feedhandler.log'
# conf['log']['level'] = 'WARNING'
return conf
log = get_logger(__name__)

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
"""
from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType
from typing import Any, Callable, AsyncGenerator
from typing import Any, Optional, Callable, AsyncGenerator
import json
import trio
@ -54,8 +54,8 @@ class NoBsWs:
self,
url: str,
stack: AsyncExitStack,
fixture: Callable,
serializer: ModuleType = json,
fixture: Optional[Callable] = None,
serializer: ModuleType = json
):
self.url = url
self.fixture = fixture
@ -80,12 +80,14 @@ class NoBsWs:
self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url)
)
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
assert ret is None
if self.fixture is not None:
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
assert ret is None
log.info(f'Connection success: {self.url}')
return self._ws
@ -127,7 +129,7 @@ async def open_autorecon_ws(
url: str,
# TODO: proper type annot smh
fixture: Callable,
fixture: Optional[Callable] = None,
) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em.