Merge pull request #409 from esmegl/json_rpc_req

Added support for JSONRPC requests coming from the server side
dark_clearing_improvements
Guillermo Rodriguez 2022-12-21 15:14:12 -03:00 committed by GitHub
commit 7b14f498a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 13 deletions

View File

@ -98,6 +98,11 @@ class JSONRPCResult(Struct):
usDiff: int usDiff: int
testnet: bool testnet: bool
class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str
params: dict
class KLinesResult(Struct): class KLinesResult(Struct):
close: list[float] close: list[float]

View File

@ -23,6 +23,7 @@ from itertools import count
from types import ModuleType from types import ModuleType
from typing import Any, Optional, Callable, AsyncGenerator from typing import Any, Optional, Callable, AsyncGenerator
import json import json
import sys
import trio import trio
import trio_websocket import trio_websocket
@ -139,7 +140,7 @@ class NoBsWs:
async def open_autorecon_ws( async def open_autorecon_ws(
url: str, url: str,
# TODO: proper type annot smh # TODO: proper type cannot smh
fixture: Optional[Callable] = None, fixture: Optional[Callable] = None,
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
@ -169,12 +170,14 @@ class JSONRPCResult(Struct):
result: Optional[dict] = None result: Optional[dict] = None
error: Optional[dict] = None error: Optional[dict] = None
@asynccontextmanager @asynccontextmanager
async def open_jsonrpc_session( async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
dtype: type = JSONRPCResult response_type: type = JSONRPCResult,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
async with ( async with (
@ -219,20 +222,42 @@ async def open_jsonrpc_session(
''' '''
receives every ws message and stores it in its corresponding result receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks. field, then sets the event to wakeup original sender tasks.
also recieves responses to requests originated from the server side.
''' '''
async for msg in ws: async for msg in ws:
msg = dtype(**msg) match msg:
case {
'result': result,
'id': mid,
} if res_entry := rpc_results.get(mid):
if msg.id not in rpc_results: res_entry['result'] = response_type(**msg)
log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') res_entry['event'].set()
res = rpc_results.setdefault( case {
msg.id, 'result': _,
{'result': None, 'event': trio.Event()} 'id': mid,
) } if not rpc_results.get(mid):
log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}')
res['result'] = msg case {
res['event'].set() 'method': _,
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case {
'error': error
}:
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task) n.start_soon(recv_task)