Compare commits

..

No commits in common. "278cb87f43146cb394d3eb142fc799c00b77606d" and "a4f7fa9c1a10e61091f297949482fdf899a2a6f2" have entirely different histories.

2 changed files with 18 additions and 51 deletions

View File

@ -466,7 +466,7 @@ class Client:
# repack in name-keyed table
return {
pair.instrument_name.lower(): pair
pair['instrument_name'].lower(): pair
for pair in matches.values()
}

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception('Retrying connection')
log.exception(f'Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs.
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
'''
@ -377,20 +377,16 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
# request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable[int] = count(start_id)
rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
@ -398,40 +394,26 @@ async def open_jsonrpc_session(
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': req_id,
'id': next(rpc_id),
'method': method,
'params': params
}
_id = msg['id']
result = rpc_results[_id] = {
rpc_results[_id] = {
'result': None,
'error': None,
'event': trio.Event(), # signal caller resp arrived
'event': trio.Event()
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
ret = rpc_results[_id]['result']
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
del rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -446,7 +428,6 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -470,29 +451,15 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
# if request_hook:
# await request_hook(request_type(**msg))
if request_hook:
await request_hook(request_type(**msg))
case {
'error': error
}:
# if error_hook:
# await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = msg['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')