From bdc3bc9219fd7e28e74a242b9206b1fe14cbaa49 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 16:58:40 -0500 Subject: [PATCH] Mk jsronrpc's underlying ws timeout `float('inf')` Since currently we're only using this IPC subsys for `deribit`, and generally speaking we're primarly supporting options markets (which are fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs` on timeout since doing so normally breaks the jsron-rpc IPC session. Without a proper `fixture` passed to `open_autorecon_ws()` (which we should eventually implement!!) relying on a timeout-to-reset more or less will just cause breakage issues - a proper reconnect sequence must be implemented before using that feature. Deats, - expose and proxy through the `msg_recv_timeout` from `open_jsonrpc_session()` into the underlying `open_autorecon_ws()` call. --- piker/data/_web_bs.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index aaca7b41..f95c2977 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -360,7 +360,7 @@ async def open_autorecon_ws( ''' JSONRPC response-request style machinery for transparent multiplexing -of msgs over a NoBsWs. +of msgs over a `NoBsWs`. ''' @@ -377,6 +377,16 @@ async def open_jsonrpc_session( url: str, start_id: int = 0, response_type: type = JSONRPCResult, + msg_recv_timeout: float = float('inf'), + # ^NOTE, since only `deribit` is using this jsonrpc stuff atm + # and options mkts are generally "slow moving".. + # + # FURTHER if we break the underlying ws connection then since we + # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. + # `_reconnect_forever()`, the jsonrpc "transport pipe" get's + # broken and never restored with wtv init sequence is required to + # re-establish a working req-resp session. + # request_type: Optional[type] = None, # request_hook: Optional[Callable] = None, # error_hook: Optional[Callable] = None, @@ -388,12 +398,18 @@ async def open_jsonrpc_session( async with ( trio.open_nursery() as n, - open_autorecon_ws(url) as ws + open_autorecon_ws( + url=url, + msg_recv_timeout=msg_recv_timeout, + ) as ws ): rpc_id: Iterable[int] = count(start_id) rpc_results: dict[int, dict] = {} - async def json_rpc(method: str, params: dict) -> dict: + async def json_rpc( + method: str, + params: dict, + ) -> dict: ''' perform a json rpc call and wait for the result, raise exception in case of error field present on response