Pull jsonrpc machinery out of deribit backend into piker.data._web_bs module and make it generic

size_in_shm_token
Guillermo Rodriguez 2022-08-25 12:08:19 -03:00
parent c5447fda06
commit 4facd161a9
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
2 changed files with 95 additions and 59 deletions

View File

@ -23,7 +23,6 @@ import time
import asyncio
from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count
from functools import partial
from datetime import datetime
from typing import Any, Optional, Iterable, Callable
@ -36,7 +35,11 @@ from fuzzywuzzy import process as fuzzy
import numpy as np
from piker.data.types import Struct
from piker.data._web_bs import NoBsWs, open_autorecon_ws
from piker.data._web_bs import (
NoBsWs,
open_autorecon_ws,
open_jsonrpc_session
)
from .._util import resproc
@ -422,65 +425,14 @@ async def get_client(
async with (
trio.open_nursery() as n,
open_autorecon_ws(_testnet_ws_url) as ws
open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
):
_rpc_id: Iterable = count(0)
_rpc_results: dict[int, dict] = {}
_expiry_time: int = float('inf')
_access_token: Optional[str] = None
_refresh_token: Optional[str] = None
async def json_rpc(method: str, params: dict) -> dict:
"""perform a json rpc call and wait for the result, raise exception in
case of error field present on response
"""
msg = {
'jsonrpc': '2.0',
'id': next(_rpc_id),
'method': method,
'params': params
}
_id = msg['id']
_rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await ws.send_msg(msg)
await _rpc_results[_id]['event'].wait()
ret = _rpc_results[_id]['result']
del _rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret
async def _recv_task():
"""receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks.
"""
async for msg in ws:
msg = JSONRPCResult(**msg)
if msg.id not in _rpc_results:
# in case this message wasn't beign accounted for store it
_rpc_results[msg.id] = {
'result': None,
'event': trio.Event()
}
_rpc_results[msg.id]['result'] = msg
_rpc_results[msg.id]['event'].set()
client = Client(json_rpc)
_refresh_token: Optional[str] = None
_access_token: Optional[str] = None
async def _auth_loop(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
):
@ -536,7 +488,6 @@ async def get_client(
else:
await trio.sleep(renew_time / 2)
n.start_soon(_recv_task)
# if we have client creds launch auth loop
if client._key_id is not None:
await n.start(_auth_loop)

View File

@ -19,6 +19,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
"""
from contextlib import asynccontextmanager, AsyncExitStack
from itertools import count
from types import ModuleType
from typing import Any, Optional, Callable, AsyncGenerator
import json
@ -35,6 +36,8 @@ from trio_websocket._impl import (
from ..log import get_logger
from .types import Struct
log = get_logger(__name__)
@ -150,3 +153,85 @@ async def open_autorecon_ws(
finally:
await stack.aclose()
'''
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
'''
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[dict] = None
error: Optional[dict] = None
@asynccontextmanager
async def open_jsonrpc_session(
url: str,
start_id: int = 0,
dtype: type = JSONRPCResult
) -> Callable[[str, dict], dict]:
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
_rpc_id: Iterable = count(start_id)
_rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
msg = {
'jsonrpc': '2.0',
'id': next(_rpc_id),
'method': method,
'params': params
}
_id = msg['id']
_rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await ws.send_msg(msg)
await _rpc_results[_id]['event'].wait()
ret = _rpc_results[_id]['result']
del _rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret
async def _recv_task():
'''
receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks.
'''
async for msg in ws:
msg = dtype(**msg)
if msg.id not in _rpc_results:
# in case this message wasn't beign accounted for store it
_rpc_results[msg.id] = {
'result': None,
'event': trio.Event()
}
_rpc_results[msg.id]['result'] = msg
_rpc_results[msg.id]['event'].set()
n.start_soon(_recv_task)
yield json_rpc
n.cancel_scope.cancel()