diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index a0361fa9..c8295e9b 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -23,7 +23,6 @@ import time import asyncio from contextlib import asynccontextmanager as acm, AsyncExitStack -from itertools import count from functools import partial from datetime import datetime from typing import Any, Optional, Iterable, Callable @@ -36,7 +35,11 @@ from fuzzywuzzy import process as fuzzy import numpy as np from piker.data.types import Struct -from piker.data._web_bs import NoBsWs, open_autorecon_ws +from piker.data._web_bs import ( + NoBsWs, + open_autorecon_ws, + open_jsonrpc_session +) from .._util import resproc @@ -422,65 +425,14 @@ async def get_client( async with ( trio.open_nursery() as n, - open_autorecon_ws(_testnet_ws_url) as ws + open_jsonrpc_session( + _testnet_ws_url, dtype=JSONRPCResult) as json_rpc ): - - _rpc_id: Iterable = count(0) - _rpc_results: dict[int, dict] = {} - - _expiry_time: int = float('inf') - _access_token: Optional[str] = None - _refresh_token: Optional[str] = None - - async def json_rpc(method: str, params: dict) -> dict: - """perform a json rpc call and wait for the result, raise exception in - case of error field present on response - """ - msg = { - 'jsonrpc': '2.0', - 'id': next(_rpc_id), - 'method': method, - 'params': params - } - _id = msg['id'] - - _rpc_results[_id] = { - 'result': None, - 'event': trio.Event() - } - - await ws.send_msg(msg) - - await _rpc_results[_id]['event'].wait() - - ret = _rpc_results[_id]['result'] - - del _rpc_results[_id] - - if ret.error is not None: - raise Exception(json.dumps(ret.error, indent=4)) - - return ret - - async def _recv_task(): - """receives every ws message and stores it in its corresponding result - field, then sets the event to wakeup original sender tasks. - """ - async for msg in ws: - msg = JSONRPCResult(**msg) - - if msg.id not in _rpc_results: - # in case this message wasn't beign accounted for store it - _rpc_results[msg.id] = { - 'result': None, - 'event': trio.Event() - } - - _rpc_results[msg.id]['result'] = msg - _rpc_results[msg.id]['event'].set() - client = Client(json_rpc) + _refresh_token: Optional[str] = None + _access_token: Optional[str] = None + async def _auth_loop( task_status: TaskStatus = trio.TASK_STATUS_IGNORED ): @@ -536,7 +488,6 @@ async def get_client( else: await trio.sleep(renew_time / 2) - n.start_soon(_recv_task) # if we have client creds launch auth loop if client._key_id is not None: await n.start(_auth_loop) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 78e82dfd..7801a409 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -19,6 +19,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. """ from contextlib import asynccontextmanager, AsyncExitStack +from itertools import count from types import ModuleType from typing import Any, Optional, Callable, AsyncGenerator import json @@ -35,6 +36,8 @@ from trio_websocket._impl import ( from ..log import get_logger +from .types import Struct + log = get_logger(__name__) @@ -150,3 +153,85 @@ async def open_autorecon_ws( finally: await stack.aclose() + + +''' +JSONRPC response-request style machinery for transparent multiplexing of msgs +over a NoBsWs. +''' + + +class JSONRPCResult(Struct): + jsonrpc: str = '2.0' + id: int + result: Optional[dict] = None + error: Optional[dict] = None + + +@asynccontextmanager +async def open_jsonrpc_session( + url: str, + start_id: int = 0, + dtype: type = JSONRPCResult +) -> Callable[[str, dict], dict]: + + async with ( + trio.open_nursery() as n, + open_autorecon_ws(url) as ws + ): + _rpc_id: Iterable = count(start_id) + _rpc_results: dict[int, dict] = {} + + async def json_rpc(method: str, params: dict) -> dict: + ''' + perform a json rpc call and wait for the result, raise exception in + case of error field present on response + ''' + msg = { + 'jsonrpc': '2.0', + 'id': next(_rpc_id), + 'method': method, + 'params': params + } + _id = msg['id'] + + _rpc_results[_id] = { + 'result': None, + 'event': trio.Event() + } + + await ws.send_msg(msg) + + await _rpc_results[_id]['event'].wait() + + ret = _rpc_results[_id]['result'] + + del _rpc_results[_id] + + if ret.error is not None: + raise Exception(json.dumps(ret.error, indent=4)) + + return ret + + async def _recv_task(): + ''' + receives every ws message and stores it in its corresponding result + field, then sets the event to wakeup original sender tasks. + ''' + async for msg in ws: + msg = dtype(**msg) + + if msg.id not in _rpc_results: + # in case this message wasn't beign accounted for store it + _rpc_results[msg.id] = { + 'result': None, + 'event': trio.Event() + } + + _rpc_results[msg.id]['result'] = msg + _rpc_results[msg.id]['event'].set() + + + n.start_soon(_recv_task) + yield json_rpc + n.cancel_scope.cancel()