Begin jsonrpc over ws refactor

deribit
Guillermo Rodriguez 2022-08-21 23:01:03 -03:00
parent 9073fbc317
commit 5f60923ac1
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
4 changed files with 206 additions and 53 deletions

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
@ -15,17 +18,22 @@
Deribit backend. Deribit backend.
''' '''
import time
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count
from datetime import datetime from datetime import datetime
from typing import Any, Optional, List from typing import Any, List, Dict, Optional, Iterable
import pendulum import pendulum
import asks import asks
import trio
from trio_typing import Nursery, TaskStatus
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
from piker.data.types import Struct from piker.data.types import Struct
from piker.data._web_bs import NoBsWs, open_autorecon_ws
from .._util import resproc from .._util import resproc
@ -38,6 +46,7 @@ log = get_logger(__name__)
_url = 'https://www.deribit.com' _url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
# Broker specific ohlc schema (rest) # Broker specific ohlc schema (rest)
@ -54,6 +63,15 @@ _ohlc_dtype = [
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: dict
usIn: int
usOut: int
usDiff: int
testnet: bool
class JSONRPCHTTPResult(Struct):
jsonrpc: str = '2.0' jsonrpc: str = '2.0'
result: dict result: dict
usIn: int usIn: int
@ -95,13 +113,131 @@ def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
conf['log'] = {}
conf['log']['disabled'] = True
return conf
class Client: class Client:
def __init__(self) -> None: def __init__(self, n: Nursery, ws: NoBsWs) -> None:
self._sesh = asks.Session(connections=4) self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url self._sesh.base_location = _url
self._pairs: dict[str, Any] = {} self._pairs: dict[str, Any] = {}
config = get_config()['deribit']
self._key_id = config['key_id']
self._key_secret = config['key_secret']
self._ws = ws
self._n = n
self._rpc_id: Iterable = count(0)
self._rpc_results: Dict[int, Dict] = {}
self._expiry_time: int = float('inf')
self._access_token: Optional[str] = None
self._refresh_token: Optional[str] = None
def _next_json_body(self, method: str, params: Dict):
return {
'jsonrpc': '2.0',
'id': next(self._rpc_id),
'method': method,
'params': params
}
async def start_rpc(self):
self._n.start_soon(self._recv_task)
await self._n.start(self._auth_loop)
async def _recv_task(self):
while True:
msg = JSONRPCResult(**(await self._ws.recv_msg()))
if msg.id not in self._rpc_results:
self._rpc_results[msg.id] = {
'result': None,
'event': trio.Event()
}
self._rpc_results[msg.id]['result'] = msg
self._rpc_results[msg.id]['event'].set()
async def json_rpc(self, method: str, params: Dict) -> Dict:
msg = self._next_json_body(method, params)
_id = msg['id']
self._rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await self._ws.send_msg(msg)
await self._rpc_results[_id]['event'].wait()
ret = self._rpc_results[_id]['result']
del self._rpc_results[_id]
return ret
async def _auth_loop(
self,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
):
'''https://docs.deribit.com/?python#authentication-2
'''
renew_time = 10
access_scope = 'trade:read_write'
self._expiry_time = time.time()
got_access = False
while True:
if time.time() - self._expiry_time < renew_time:
if self._refresh_token != None:
params = {
'grant_type': 'refresh_token',
'refresh_token': self._refresh_token,
'scope': access_scope
}
else:
params = {
'grant_type': 'client_credentials',
'client_id': self._key_id,
'client_secret': self._key_secret,
'scope': access_scope
}
resp = await self.json_rpc('public/auth', params)
result = resp.result
self._expiry_time = time.time() + result['expires_in']
self._refresh_token = result['refresh_token']
if 'access_token' in result:
self._access_token = result['access_token']
if not got_access:
got_access = True
task_status.started()
else:
await trio.sleep(renew_time / 2)
async def _api( async def _api(
self, self,
method: str, method: str,
@ -134,10 +270,8 @@ class Client:
'expired': str(expired).lower() 'expired': str(expired).lower()
} }
resp = await self._api( resp = await self.json_rpc('public/get_instruments', params)
'get_instruments', params=params) results = resp.result
results = resp['result']
instruments = { instruments = {
item['instrument_name']: item for item in results} item['instrument_name']: item for item in results}
@ -195,19 +329,16 @@ class Client:
end_time = deribit_timestamp(end_dt) end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
response = await self._api( resp = await self.json_rpc(
'get_tradingview_chart_data', 'public/get_tradingview_chart_data',
params={ params={
'instrument_name': instrument.upper(), 'instrument_name': instrument.upper(),
'start_timestamp': start_time, 'start_timestamp': start_time,
'end_timestamp': end_time, 'end_timestamp': end_time,
'resolution': '1' 'resolution': '1'
} })
)
klines = JSONRPCResult(**response) result = KLinesResult(**resp.result)
result = KLinesResult(**klines.result)
new_bars = [] new_bars = []
for i in range(len(result.close)): for i in range(len(result.close)):
@ -237,19 +368,24 @@ class Client:
instrument: str, instrument: str,
count: int = 10 count: int = 10
): ):
response = await self._api( resp = await self.json_rpc(
'get_last_trades_by_instrument', 'public/get_last_trades_by_instrument',
params={ params={
'instrument_name': instrument, 'instrument_name': instrument,
'count': count 'count': count
} })
)
return LastTradesResult(**(JSONRPCResult(**response).result)) return LastTradesResult(**resp.result)
@acm @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() async with (
await client.cache_symbols() trio.open_nursery() as n,
yield client open_autorecon_ws(_ws_url) as ws
):
client = Client(n, ws)
await client.start_rpc()
await client.cache_symbols()
yield client

View File

@ -0,0 +1,32 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Order api and machinery
'''
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with get_client() as client:

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
@ -31,7 +34,6 @@ import numpy as np
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
from piker import config
from piker._cacheables import open_cached_client from piker._cacheables import open_cached_client
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray from piker.data import ShmArray
@ -47,32 +49,13 @@ from cryptofeed.defines import (
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
from .api import Client, Trade from .api import Client, Trade, get_config
_spawn_kwargs = { _spawn_kwargs = {
'infect_asyncio': True, 'infect_asyncio': True,
} }
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
conf['log'] = {}
conf['log']['disabled'] = True
# conf['log']['filename'] = '/tmp/feedhandler.log'
# conf['log']['level'] = 'WARNING'
return conf
log = get_logger(__name__) log = get_logger(__name__)

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType from types import ModuleType
from typing import Any, Callable, AsyncGenerator from typing import Any, Optional, Callable, AsyncGenerator
import json import json
import trio import trio
@ -54,8 +54,8 @@ class NoBsWs:
self, self,
url: str, url: str,
stack: AsyncExitStack, stack: AsyncExitStack,
fixture: Callable, fixture: Optional[Callable] = None,
serializer: ModuleType = json, serializer: ModuleType = json
): ):
self.url = url self.url = url
self.fixture = fixture self.fixture = fixture
@ -80,12 +80,14 @@ class NoBsWs:
self._ws = await self._stack.enter_async_context( self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url) trio_websocket.open_websocket_url(self.url)
) )
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
assert ret is None if self.fixture is not None:
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
assert ret is None
log.info(f'Connection success: {self.url}') log.info(f'Connection success: {self.url}')
return self._ws return self._ws
@ -127,7 +129,7 @@ async def open_autorecon_ws(
url: str, url: str,
# TODO: proper type annot smh # TODO: proper type annot smh
fixture: Callable, fixture: Optional[Callable] = None,
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.