diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index c8295e9b..4159b18a 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -95,9 +95,14 @@ class JSONRPCResult(Struct): error: Optional[dict] = None usIn: int usOut: int - usDiff: int + usDiff: int testnet: bool +class JSONRPCChannel(Struct): + jsonrpc: str = '2.0' + method: str + params: dict + class KLinesResult(Struct): close: list[float] diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index c42b7afa..779a7d44 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -23,6 +23,7 @@ from itertools import count from types import ModuleType from typing import Any, Optional, Callable, AsyncGenerator import json +import sys import trio import trio_websocket @@ -139,7 +140,7 @@ class NoBsWs: async def open_autorecon_ws( url: str, - # TODO: proper type annot smh + # TODO: proper type cannot smh fixture: Optional[Callable] = None, ) -> AsyncGenerator[tuple[...], NoBsWs]: @@ -169,12 +170,14 @@ class JSONRPCResult(Struct): result: Optional[dict] = None error: Optional[dict] = None - @asynccontextmanager async def open_jsonrpc_session( url: str, start_id: int = 0, - dtype: type = JSONRPCResult + response_type: type = JSONRPCResult, + request_type: Optional[type] = None, + request_hook: Optional[Callable] = None, + error_hook: Optional[Callable] = None, ) -> Callable[[str, dict], dict]: async with ( @@ -219,20 +222,42 @@ async def open_jsonrpc_session( ''' receives every ws message and stores it in its corresponding result field, then sets the event to wakeup original sender tasks. + also recieves responses to requests originated from the server side. ''' + async for msg in ws: - msg = dtype(**msg) + match msg: + case { + 'result': result, + 'id': mid, + } if res_entry := rpc_results.get(mid): - if msg.id not in rpc_results: - log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') + res_entry['result'] = response_type(**msg) + res_entry['event'].set() - res = rpc_results.setdefault( - msg.id, - {'result': None, 'event': trio.Event()} - ) + case { + 'result': _, + 'id': mid, + } if not rpc_results.get(mid): + log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') - res['result'] = msg - res['event'].set() + case { + 'method': _, + 'params': _, + }: + log.debug(f'Recieved\n{msg}') + if request_hook: + await request_hook(request_type(**msg)) + + case { + 'error': error + }: + log.warning(f'Recieved\n{error}') + if error_hook: + await error_hook(response_type(**msg)) + + case _: + log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') n.start_soon(recv_task)