data._web_bs: try to raise jsonrpc errors in parent task

deribit_fix
Tyler Goodlet 2024-11-09 15:14:34 -05:00 committed by Nelson Torres
parent a4f7fa9c1a
commit 6fa0d4bcf3
1 changed files with 50 additions and 17 deletions

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError: except HandshakeError:
log.exception(f'Retrying connection') log.exception('Retrying connection')
# ws & nursery block ends # ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing of msgs JSONRPC response-request style machinery for transparent multiplexing
over a NoBsWs. of msgs over a NoBsWs.
''' '''
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
request_type: Optional[type] = None, # request_type: Optional[type] = None,
request_hook: Optional[Callable] = None, # request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None, # error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(url) as ws open_autorecon_ws(url) as ws
): ):
rpc_id: Iterable = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict: async def json_rpc(method: str, params: dict) -> dict:
@ -394,27 +398,41 @@ async def open_jsonrpc_session(
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
''' '''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': next(rpc_id), 'id': req_id,
'method': method, 'method': method,
'params': params 'params': params
} }
_id = msg['id'] _id = msg['id']
rpc_results[_id] = { result = rpc_results[_id] = {
'result': None, 'result': None,
'event': trio.Event() 'error': None,
'event': trio.Event(), # signal caller resp arrived
} }
req_msgs[_id] = msg
await ws.send_msg(msg) await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait() await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result'] if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id] del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None: if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4)) raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
the server side. the server side.
''' '''
nonlocal req_msgs
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
if request_hook: # if request_hook:
await request_hook(request_type(**msg)) # await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
log.warning(f'Recieved\n{error}') # if error_hook:
if error_hook: # await error_hook(response_type(**msg))
await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')