Compare commits

..

No commits in common. "278cb87f43146cb394d3eb142fc799c00b77606d" and "a4f7fa9c1a10e61091f297949482fdf899a2a6f2" have entirely different histories.

2 changed files with 18 additions and 51 deletions

View File

@ -466,7 +466,7 @@ class Client:
# repack in name-keyed table # repack in name-keyed table
return { return {
pair.instrument_name.lower(): pair pair['instrument_name'].lower(): pair
for pair in matches.values() for pair in matches.values()
} }

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError: except HandshakeError:
log.exception('Retrying connection') log.exception(f'Retrying connection')
# ws & nursery block ends # ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing of msgs
of msgs over a NoBsWs. over a NoBsWs.
''' '''
@ -377,20 +377,16 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
# request_type: Optional[type] = None, request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None, request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None, error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(url) as ws open_autorecon_ws(url) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict: async def json_rpc(method: str, params: dict) -> dict:
@ -398,40 +394,26 @@ async def open_jsonrpc_session(
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
''' '''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': req_id, 'id': next(rpc_id),
'method': method, 'method': method,
'params': params 'params': params
} }
_id = msg['id'] _id = msg['id']
result = rpc_results[_id] = { rpc_results[_id] = {
'result': None, 'result': None,
'error': None, 'event': trio.Event()
'event': trio.Event(), # signal caller resp arrived
} }
req_msgs[_id] = msg
await ws.send_msg(msg) await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait() await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']): ret = rpc_results[_id]['result']
ret = maybe_result
del rpc_results[_id]
else: del rpc_results[_id]
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None: if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4)) raise Exception(json.dumps(ret.error, indent=4))
@ -446,7 +428,6 @@ async def open_jsonrpc_session(
the server side. the server side.
''' '''
nonlocal req_msgs
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
@ -470,29 +451,15 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
# if request_hook: if request_hook:
# await request_hook(request_type(**msg)) await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
# if error_hook: log.warning(f'Recieved\n{error}')
# await error_hook(response_type(**msg)) if error_hook:
await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = msg['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')