diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index a597f7be..779a7d44 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -178,9 +178,6 @@ async def open_jsonrpc_session( request_type: Optional[type] = None, request_hook: Optional[Callable] = None, error_hook: Optional[Callable] = None, - timeout: int = 5, - timeout_hook: Optional[Callable] = None, - timeout_args: list = [], ) -> Callable[[str, dict], dict]: async with ( @@ -226,48 +223,41 @@ async def open_jsonrpc_session( receives every ws message and stores it in its corresponding result field, then sets the event to wakeup original sender tasks. also recieves responses to requests originated from the server side. - reconnects the tasks after timeout. ''' - with trio.move_on_after(timeout) as cancel_scope: - async for msg in ws: - match msg: - case { - 'result': result, - 'id': mid, - } if res_entry := rpc_results.get(mid): - res_entry['result'] = response_type(**msg) - res_entry['event'].set() + async for msg in ws: + match msg: + case { + 'result': result, + 'id': mid, + } if res_entry := rpc_results.get(mid): - case { - 'result': _, - 'id': mid, - } if not rpc_results.get(mid): - log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') + res_entry['result'] = response_type(**msg) + res_entry['event'].set() - case { - 'method': _, - 'params': _, - }: - log.debug(f'Recieved\n{msg}') - if request_hook: - await request_hook(request_type(**msg)) + case { + 'result': _, + 'id': mid, + } if not rpc_results.get(mid): + log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') - case { - 'error': error - }: - log.warning(f'Recieved\n{error}') - if error_hook: - await error_hook(response_type(**msg)) + case { + 'method': _, + 'params': _, + }: + log.debug(f'Recieved\n{msg}') + if request_hook: + await request_hook(request_type(**msg)) - case _: - log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') + case { + 'error': error + }: + log.warning(f'Recieved\n{error}') + if error_hook: + await error_hook(response_type(**msg)) - if cancel_scope.cancelled_caught: - await ws._connect() - n.start_soon(recv_task) - if timeout_hook: - n.start_soon(timeout_hook, json_rpc, *timeout_args) + case _: + log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') n.start_soon(recv_task)