Simplify rpc machinery, and switch refs to Dict and List to builtins, make brokercheck call public broker methods and get their results again

size_in_shm_token
Guillermo Rodriguez 2022-08-25 09:18:52 -03:00
parent f286c79a03
commit 9626dbd7ac
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
3 changed files with 148 additions and 110 deletions

View File

@ -39,25 +39,20 @@ _config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
@cli.command()
@click.argument('broker', nargs=1, required=True)
@click.pass_obj
def brokercheck(config, broker):
'''
Test broker apis for completeness.
'''
OK = '\033[92m' OK = '\033[92m'
WARNING = '\033[93m' WARNING = '\033[93m'
FAIL = '\033[91m' FAIL = '\033[91m'
ENDC = '\033[0m' ENDC = '\033[0m'
def print_ok(s: str, **kwargs): def print_ok(s: str, **kwargs):
print(OK + s + ENDC, **kwargs) print(OK + s + ENDC, **kwargs)
def print_error(s: str, **kwargs): def print_error(s: str, **kwargs):
print(FAIL + s + ENDC, **kwargs) print(FAIL + s + ENDC, **kwargs)
def get_method(client, meth_name: str): def get_method(client, meth_name: str):
print(f'checking client for method \'{meth_name}\'...', end='', flush=True) print(f'checking client for method \'{meth_name}\'...', end='', flush=True)
method = getattr(client, meth_name, None) method = getattr(client, meth_name, None)
@ -65,6 +60,13 @@ def brokercheck(config, broker):
print_ok('found!.') print_ok('found!.')
return method return method
async def run_method(client, meth_name: str, **kwargs):
method = get_method(client, meth_name)
print('running...', end='', flush=True)
result = await method(**kwargs)
print_ok(f'done! result: {type(result)}')
return result
async def run_test(broker_name: str): async def run_test(broker_name: str):
brokermod = get_brokermod(broker_name) brokermod = get_brokermod(broker_name)
total = 0 total = 0
@ -114,9 +116,7 @@ def brokercheck(config, broker):
'get_xfers', 'get_xfers',
'submit_limit', 'submit_limit',
'submit_cancel', 'submit_cancel',
'cache_symbols',
'search_symbols', 'search_symbols',
'bars'
] ]
for method_name in method_list: for method_name in method_list:
@ -130,8 +130,53 @@ def brokercheck(config, broker):
total += 1 total += 1
# check for methods present con brokermod.Client and their
# results
syms = await run_method(client, 'symbol_info')
total += 1
if len(syms) == 0:
raise BaseException('Empty Symbol list?')
passed += 1
first_sym = tuple(syms.keys())[0]
method_list = [
('cache_symbols', {}),
('search_symbols', {'pattern': first_sym[:-1]}),
('bars', {'symbol': first_sym})
]
for method_name, method_kwargs in method_list:
try:
await run_method(client, method_name, **method_kwargs)
passed += 1
except AssertionError:
print_error(f'fail! method \'{method_name}\' not found.')
failed += 1
total += 1
print(f'total: {total}, passed: {passed}, failed: {failed}') print(f'total: {total}, passed: {passed}, failed: {failed}')
@cli.command()
@click.argument('broker', nargs=1, required=True)
@click.pass_obj
def brokercheck(config, broker):
'''
Test broker apis for completeness.
'''
async def bcheck_main():
async with maybe_spawn_brokerd(broker) as portal:
await portal.run(run_test, broker)
await portal.cancel_actor()
trio.run(run_test, broker) trio.run(run_test, broker)

View File

@ -26,7 +26,7 @@ from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count from itertools import count
from functools import partial from functools import partial
from datetime import datetime from datetime import datetime
from typing import Any, List, Dict, Optional, Iterable, Callable from typing import Any, Optional, Iterable, Callable
import pendulum import pendulum
import asks import asks
@ -91,14 +91,14 @@ class JSONRPCResult(Struct):
class KLinesResult(Struct): class KLinesResult(Struct):
close: List[float] close: list[float]
cost: List[float] cost: list[float]
high: List[float] high: list[float]
low: List[float] low: list[float]
open: List[float] open: list[float]
status: str status: str
ticks: List[int] ticks: list[int]
volume: List[float] volume: list[float]
class Trade(Struct): class Trade(Struct):
trade_seq: int trade_seq: int
@ -116,7 +116,7 @@ class Trade(Struct):
amount: float amount: float
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: List[Trade] trades: list[Trade]
has_more: bool has_more: bool
@ -414,37 +414,28 @@ async def get_client(
is_brokercheck: bool = False is_brokercheck: bool = False
) -> Client: ) -> Client:
if is_brokercheck:
yield Client
return
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(_testnet_ws_url) as ws open_autorecon_ws(_testnet_ws_url) as ws
): ):
_rpc_id: Iterable = count(0) _rpc_id: Iterable = count(0)
_rpc_results: Dict[int, Dict] = {} _rpc_results: dict[int, dict] = {}
_expiry_time: int = float('inf') _expiry_time: int = float('inf')
_access_token: Optional[str] = None _access_token: Optional[str] = None
_refresh_token: Optional[str] = None _refresh_token: Optional[str] = None
def _next_json_body(method: str, params: Dict): async def json_rpc(method: str, params: dict) -> dict:
"""get the typical json rpc 2.0 msg body and increment the req id """perform a json rpc call and wait for the result, raise exception in
case of error field present on response
""" """
return { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': next(_rpc_id), 'id': next(_rpc_id),
'method': method, 'method': method,
'params': params 'params': params
} }
async def json_rpc(method: str, params: Dict) -> Dict:
"""perform a json rpc call and wait for the result, raise exception in
case of error field present on response
"""
msg = _next_json_body(method, params)
_id = msg['id'] _id = msg['id']
_rpc_results[_id] = { _rpc_results[_id] = {
@ -546,6 +537,7 @@ async def get_client(
await client.cache_symbols() await client.cache_symbols()
yield client yield client
n.cancel_scope.cancel()
@acm @acm
@ -618,7 +610,8 @@ async def aio_price_feed_relay(
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str) -> trio.abc.ReceiveStream: instrument: str
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
partial( partial(
@ -684,7 +677,7 @@ async def aio_order_feed_relay(
@acm @acm
async def open_order_feed( async def open_order_feed(
instrument: List[str] instrument: list[str]
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(

View File

@ -20,7 +20,7 @@ Deribit backend.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, List, Callable from typing import Any, Optional, Callable
import time import time
import trio import trio