Merge pull request #391 from pikers/json_rpc_generic

Pull jsonrpc machinery out of deribit backend
size_in_shm_token
goodboy 2022-08-27 15:33:12 -04:00 committed by GitHub
commit 71412310c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 59 deletions

View File

@ -23,7 +23,6 @@ import time
import asyncio import asyncio
from contextlib import asynccontextmanager as acm, AsyncExitStack from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count
from functools import partial from functools import partial
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Iterable, Callable from typing import Any, Optional, Iterable, Callable
@ -36,7 +35,11 @@ from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
from piker.data.types import Struct from piker.data.types import Struct
from piker.data._web_bs import NoBsWs, open_autorecon_ws from piker.data._web_bs import (
NoBsWs,
open_autorecon_ws,
open_jsonrpc_session
)
from .._util import resproc from .._util import resproc
@ -422,65 +425,14 @@ async def get_client(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(_testnet_ws_url) as ws open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
): ):
_rpc_id: Iterable = count(0)
_rpc_results: dict[int, dict] = {}
_expiry_time: int = float('inf')
_access_token: Optional[str] = None
_refresh_token: Optional[str] = None
async def json_rpc(method: str, params: dict) -> dict:
"""perform a json rpc call and wait for the result, raise exception in
case of error field present on response
"""
msg = {
'jsonrpc': '2.0',
'id': next(_rpc_id),
'method': method,
'params': params
}
_id = msg['id']
_rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await ws.send_msg(msg)
await _rpc_results[_id]['event'].wait()
ret = _rpc_results[_id]['result']
del _rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret
async def _recv_task():
"""receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks.
"""
async for msg in ws:
msg = JSONRPCResult(**msg)
if msg.id not in _rpc_results:
# in case this message wasn't beign accounted for store it
_rpc_results[msg.id] = {
'result': None,
'event': trio.Event()
}
_rpc_results[msg.id]['result'] = msg
_rpc_results[msg.id]['event'].set()
client = Client(json_rpc) client = Client(json_rpc)
_refresh_token: Optional[str] = None
_access_token: Optional[str] = None
async def _auth_loop( async def _auth_loop(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED task_status: TaskStatus = trio.TASK_STATUS_IGNORED
): ):
@ -536,7 +488,6 @@ async def get_client(
else: else:
await trio.sleep(renew_time / 2) await trio.sleep(renew_time / 2)
n.start_soon(_recv_task)
# if we have client creds launch auth loop # if we have client creds launch auth loop
if client._key_id is not None: if client._key_id is not None:
await n.start(_auth_loop) await n.start(_auth_loop)

View File

@ -19,6 +19,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from itertools import count
from types import ModuleType from types import ModuleType
from typing import Any, Optional, Callable, AsyncGenerator from typing import Any, Optional, Callable, AsyncGenerator
import json import json
@ -35,6 +36,8 @@ from trio_websocket._impl import (
from ..log import get_logger from ..log import get_logger
from .types import Struct
log = get_logger(__name__) log = get_logger(__name__)
@ -150,3 +153,86 @@ async def open_autorecon_ws(
finally: finally:
await stack.aclose() await stack.aclose()
'''
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
'''
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[dict] = None
error: Optional[dict] = None
@asynccontextmanager
async def open_jsonrpc_session(
url: str,
start_id: int = 0,
dtype: type = JSONRPCResult
) -> Callable[[str, dict], dict]:
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
msg = {
'jsonrpc': '2.0',
'id': next(rpc_id),
'method': method,
'params': params
}
_id = msg['id']
rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await ws.send_msg(msg)
await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result']
del rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret
async def recv_task():
'''
receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks.
'''
async for msg in ws:
msg = dtype(**msg)
if msg.id not in rpc_results:
log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}')
res = rpc_results.setdefault(
msg.id,
{'result': None, 'event': trio.Event()}
)
res['result'] = msg
res['event'].set()
n.start_soon(recv_task)
yield json_rpc
n.cancel_scope.cancel()