diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index c8295e9b..4159b18a 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -95,9 +95,14 @@ class JSONRPCResult(Struct): error: Optional[dict] = None usIn: int usOut: int - usDiff: int + usDiff: int testnet: bool +class JSONRPCChannel(Struct): + jsonrpc: str = '2.0' + method: str + params: dict + class KLinesResult(Struct): close: list[float] diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index c42b7afa..a1cf44d7 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -174,7 +174,9 @@ class JSONRPCResult(Struct): async def open_jsonrpc_session( url: str, start_id: int = 0, - dtype: type = JSONRPCResult + response_type: type = JSONRPCResult, + request_type: Optional[type] = None, + request_hook: Optional[Callable] = None ) -> Callable[[str, dict], dict]: async with ( @@ -221,18 +223,26 @@ async def open_jsonrpc_session( field, then sets the event to wakeup original sender tasks. ''' async for msg in ws: - msg = dtype(**msg) + try: + msg = response_type(**msg) - if msg.id not in rpc_results: - log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') + if msg.id not in rpc_results: + log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') - res = rpc_results.setdefault( - msg.id, - {'result': None, 'event': trio.Event()} - ) + res = rpc_results.setdefault( + msg.id, + {'result': None, 'event': trio.Event()} + ) - res['result'] = msg - res['event'].set() + res['result'] = msg + res['event'].set() + + except TypeError: + if request_type == None: + raise + await request_hook(request_type(**msg)) + + n.start_soon(recv_task)