diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index a401588d..e9d2deeb 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -377,17 +377,22 @@ async def open_jsonrpc_session( url: str, start_id: int = 0, response_type: type = JSONRPCResult, - # request_type: Optional[type] = None, - # request_hook: Optional[Callable] = None, - # error_hook: Optional[Callable] = None, ) -> Callable[[str, dict], dict]: + ''' + Init a json-RPC-over-websocket connection to the provided `url`. + A `json_rpc: Callable[[str, dict], dict` is delivered to the + caller for sending requests and a bg-`trio.Task` handles + processing of response msgs including error reporting/raising in + the parent/caller task. + + ''' # NOTE, store all request msgs so we can raise errors on the # caller side! req_msgs: dict[int, dict] = {} async with ( - trio.open_nursery() as n, + trio.open_nursery() as tn, open_autorecon_ws(url) as ws ): rpc_id: Iterable[int] = count(start_id) @@ -470,15 +475,10 @@ async def open_jsonrpc_session( 'params': _, }: log.debug(f'Recieved\n{msg}') - # if request_hook: - # await request_hook(request_type(**msg)) case { 'error': error }: - # if error_hook: - # await error_hook(response_type(**msg)) - # retreive orig request msg, set error # response in original "result" msg, # THEN FINALLY set the event to signal caller @@ -497,6 +497,6 @@ async def open_jsonrpc_session( case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') - n.start_soon(recv_task) + tn.start_soon(recv_task) yield json_rpc - n.cancel_scope.cancel() + tn.cancel_scope.cancel()