diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 7801a409..da4f2a30 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -179,8 +179,8 @@ async def open_jsonrpc_session( trio.open_nursery() as n, open_autorecon_ws(url) as ws ): - _rpc_id: Iterable = count(start_id) - _rpc_results: dict[int, dict] = {} + rpc_id: Iterable = count(start_id) + rpc_results: dict[int, dict] = {} async def json_rpc(method: str, params: dict) -> dict: ''' @@ -189,31 +189,31 @@ async def open_jsonrpc_session( ''' msg = { 'jsonrpc': '2.0', - 'id': next(_rpc_id), + 'id': next(rpc_id), 'method': method, 'params': params } _id = msg['id'] - _rpc_results[_id] = { + rpc_results[_id] = { 'result': None, 'event': trio.Event() } await ws.send_msg(msg) - await _rpc_results[_id]['event'].wait() + await rpc_results[_id]['event'].wait() - ret = _rpc_results[_id]['result'] + ret = rpc_results[_id]['result'] - del _rpc_results[_id] + del rpc_results[_id] if ret.error is not None: raise Exception(json.dumps(ret.error, indent=4)) return ret - async def _recv_task(): + async def recv_task(): ''' receives every ws message and stores it in its corresponding result field, then sets the event to wakeup original sender tasks. @@ -221,17 +221,18 @@ async def open_jsonrpc_session( async for msg in ws: msg = dtype(**msg) - if msg.id not in _rpc_results: - # in case this message wasn't beign accounted for store it - _rpc_results[msg.id] = { - 'result': None, - 'event': trio.Event() - } + if msg.id not in rpc_results: + log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') - _rpc_results[msg.id]['result'] = msg - _rpc_results[msg.id]['event'].set() + res = rpc_results.setdefault( + msg.id, + {'result': None, 'event': trio.Event()} + ) + + res['result'] = msg + res['event'].set() - n.start_soon(_recv_task) + n.start_soon(recv_task) yield json_rpc n.cancel_scope.cancel()