From 4facd161a9977f0c1eb06bcc897e902bbdef6da3 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Thu, 25 Aug 2022 12:08:19 -0300 Subject: [PATCH 1/2] Pull jsonrpc machinery out of deribit backend into piker.data._web_bs module and make it generic --- piker/brokers/deribit/api.py | 69 +++++------------------------ piker/data/_web_bs.py | 85 ++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 59 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index a0361fa9..c8295e9b 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -23,7 +23,6 @@ import time import asyncio from contextlib import asynccontextmanager as acm, AsyncExitStack -from itertools import count from functools import partial from datetime import datetime from typing import Any, Optional, Iterable, Callable @@ -36,7 +35,11 @@ from fuzzywuzzy import process as fuzzy import numpy as np from piker.data.types import Struct -from piker.data._web_bs import NoBsWs, open_autorecon_ws +from piker.data._web_bs import ( + NoBsWs, + open_autorecon_ws, + open_jsonrpc_session +) from .._util import resproc @@ -422,65 +425,14 @@ async def get_client( async with ( trio.open_nursery() as n, - open_autorecon_ws(_testnet_ws_url) as ws + open_jsonrpc_session( + _testnet_ws_url, dtype=JSONRPCResult) as json_rpc ): - - _rpc_id: Iterable = count(0) - _rpc_results: dict[int, dict] = {} - - _expiry_time: int = float('inf') - _access_token: Optional[str] = None - _refresh_token: Optional[str] = None - - async def json_rpc(method: str, params: dict) -> dict: - """perform a json rpc call and wait for the result, raise exception in - case of error field present on response - """ - msg = { - 'jsonrpc': '2.0', - 'id': next(_rpc_id), - 'method': method, - 'params': params - } - _id = msg['id'] - - _rpc_results[_id] = { - 'result': None, - 'event': trio.Event() - } - - await ws.send_msg(msg) - - await _rpc_results[_id]['event'].wait() - - ret = _rpc_results[_id]['result'] - - del _rpc_results[_id] - - if ret.error is not None: - raise Exception(json.dumps(ret.error, indent=4)) - - return ret - - async def _recv_task(): - """receives every ws message and stores it in its corresponding result - field, then sets the event to wakeup original sender tasks. - """ - async for msg in ws: - msg = JSONRPCResult(**msg) - - if msg.id not in _rpc_results: - # in case this message wasn't beign accounted for store it - _rpc_results[msg.id] = { - 'result': None, - 'event': trio.Event() - } - - _rpc_results[msg.id]['result'] = msg - _rpc_results[msg.id]['event'].set() - client = Client(json_rpc) + _refresh_token: Optional[str] = None + _access_token: Optional[str] = None + async def _auth_loop( task_status: TaskStatus = trio.TASK_STATUS_IGNORED ): @@ -536,7 +488,6 @@ async def get_client( else: await trio.sleep(renew_time / 2) - n.start_soon(_recv_task) # if we have client creds launch auth loop if client._key_id is not None: await n.start(_auth_loop) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 78e82dfd..7801a409 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -19,6 +19,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. """ from contextlib import asynccontextmanager, AsyncExitStack +from itertools import count from types import ModuleType from typing import Any, Optional, Callable, AsyncGenerator import json @@ -35,6 +36,8 @@ from trio_websocket._impl import ( from ..log import get_logger +from .types import Struct + log = get_logger(__name__) @@ -150,3 +153,85 @@ async def open_autorecon_ws( finally: await stack.aclose() + + +''' +JSONRPC response-request style machinery for transparent multiplexing of msgs +over a NoBsWs. +''' + + +class JSONRPCResult(Struct): + jsonrpc: str = '2.0' + id: int + result: Optional[dict] = None + error: Optional[dict] = None + + +@asynccontextmanager +async def open_jsonrpc_session( + url: str, + start_id: int = 0, + dtype: type = JSONRPCResult +) -> Callable[[str, dict], dict]: + + async with ( + trio.open_nursery() as n, + open_autorecon_ws(url) as ws + ): + _rpc_id: Iterable = count(start_id) + _rpc_results: dict[int, dict] = {} + + async def json_rpc(method: str, params: dict) -> dict: + ''' + perform a json rpc call and wait for the result, raise exception in + case of error field present on response + ''' + msg = { + 'jsonrpc': '2.0', + 'id': next(_rpc_id), + 'method': method, + 'params': params + } + _id = msg['id'] + + _rpc_results[_id] = { + 'result': None, + 'event': trio.Event() + } + + await ws.send_msg(msg) + + await _rpc_results[_id]['event'].wait() + + ret = _rpc_results[_id]['result'] + + del _rpc_results[_id] + + if ret.error is not None: + raise Exception(json.dumps(ret.error, indent=4)) + + return ret + + async def _recv_task(): + ''' + receives every ws message and stores it in its corresponding result + field, then sets the event to wakeup original sender tasks. + ''' + async for msg in ws: + msg = dtype(**msg) + + if msg.id not in _rpc_results: + # in case this message wasn't beign accounted for store it + _rpc_results[msg.id] = { + 'result': None, + 'event': trio.Event() + } + + _rpc_results[msg.id]['result'] = msg + _rpc_results[msg.id]['event'].set() + + + n.start_soon(_recv_task) + yield json_rpc + n.cancel_scope.cancel() From 0c323fdc0b66629c997e5f75ccfb5d30e0a72023 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 27 Aug 2022 09:12:02 -0300 Subject: [PATCH 2/2] Minor style changes and warning on unexpected msg --- piker/data/_web_bs.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 7801a409..da4f2a30 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -179,8 +179,8 @@ async def open_jsonrpc_session( trio.open_nursery() as n, open_autorecon_ws(url) as ws ): - _rpc_id: Iterable = count(start_id) - _rpc_results: dict[int, dict] = {} + rpc_id: Iterable = count(start_id) + rpc_results: dict[int, dict] = {} async def json_rpc(method: str, params: dict) -> dict: ''' @@ -189,31 +189,31 @@ async def open_jsonrpc_session( ''' msg = { 'jsonrpc': '2.0', - 'id': next(_rpc_id), + 'id': next(rpc_id), 'method': method, 'params': params } _id = msg['id'] - _rpc_results[_id] = { + rpc_results[_id] = { 'result': None, 'event': trio.Event() } await ws.send_msg(msg) - await _rpc_results[_id]['event'].wait() + await rpc_results[_id]['event'].wait() - ret = _rpc_results[_id]['result'] + ret = rpc_results[_id]['result'] - del _rpc_results[_id] + del rpc_results[_id] if ret.error is not None: raise Exception(json.dumps(ret.error, indent=4)) return ret - async def _recv_task(): + async def recv_task(): ''' receives every ws message and stores it in its corresponding result field, then sets the event to wakeup original sender tasks. @@ -221,17 +221,18 @@ async def open_jsonrpc_session( async for msg in ws: msg = dtype(**msg) - if msg.id not in _rpc_results: - # in case this message wasn't beign accounted for store it - _rpc_results[msg.id] = { - 'result': None, - 'event': trio.Event() - } + if msg.id not in rpc_results: + log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') - _rpc_results[msg.id]['result'] = msg - _rpc_results[msg.id]['event'].set() + res = rpc_results.setdefault( + msg.id, + {'result': None, 'event': trio.Event()} + ) + + res['result'] = msg + res['event'].set() - n.start_soon(_recv_task) + n.start_soon(recv_task) yield json_rpc n.cancel_scope.cancel()