From 9626dbd7aced719fe2bd0122724c1242ee12f4a1 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Thu, 25 Aug 2022 09:18:52 -0300 Subject: [PATCH] Simplify rpc machinery, and switch refs to Dict and List to builtins, make brokercheck call public broker methods and get their results again --- piker/brokers/cli.py | 213 ++++++++++++++++++++-------------- piker/brokers/deribit/api.py | 43 +++---- piker/brokers/deribit/feed.py | 2 +- 3 files changed, 148 insertions(+), 110 deletions(-) diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 4079a236..0d84384d 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -39,6 +39,131 @@ _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') +OK = '\033[92m' +WARNING = '\033[93m' +FAIL = '\033[91m' +ENDC = '\033[0m' + + +def print_ok(s: str, **kwargs): + print(OK + s + ENDC, **kwargs) + + +def print_error(s: str, **kwargs): + print(FAIL + s + ENDC, **kwargs) + + +def get_method(client, meth_name: str): + print(f'checking client for method \'{meth_name}\'...', end='', flush=True) + method = getattr(client, meth_name, None) + assert method + print_ok('found!.') + return method + +async def run_method(client, meth_name: str, **kwargs): + method = get_method(client, meth_name) + print('running...', end='', flush=True) + result = await method(**kwargs) + print_ok(f'done! result: {type(result)}') + return result + +async def run_test(broker_name: str): + brokermod = get_brokermod(broker_name) + total = 0 + passed = 0 + failed = 0 + + print(f'getting client...', end='', flush=True) + if not hasattr(brokermod, 'get_client'): + print_error('fail! no \'get_client\' context manager found.') + return + + async with brokermod.get_client(is_brokercheck=True) as client: + print_ok(f'done! inside client context.') + + # check for methods present on brokermod + method_list = [ + 'backfill_bars', + 'get_client', + 'trades_dialogue', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', + + ] + + for method in method_list: + print( + f'checking brokermod for method \'{method}\'...', + end='', flush=True) + if not hasattr(brokermod, method): + print_error(f'fail! method \'{method}\' not found.') + failed += 1 + else: + print_ok('done!') + passed += 1 + + total += 1 + + # check for methods present con brokermod.Client and their + # results + + # for private methods only check is present + method_list = [ + 'get_balances', + 'get_assets', + 'get_trades', + 'get_xfers', + 'submit_limit', + 'submit_cancel', + 'search_symbols', + ] + + for method_name in method_list: + try: + get_method(client, method_name) + passed += 1 + + except AssertionError: + print_error(f'fail! method \'{method_name}\' not found.') + failed += 1 + + total += 1 + + + # check for methods present con brokermod.Client and their + # results + + syms = await run_method(client, 'symbol_info') + total += 1 + + if len(syms) == 0: + raise BaseException('Empty Symbol list?') + + passed += 1 + + first_sym = tuple(syms.keys())[0] + + method_list = [ + ('cache_symbols', {}), + ('search_symbols', {'pattern': first_sym[:-1]}), + ('bars', {'symbol': first_sym}) + ] + + for method_name, method_kwargs in method_list: + try: + await run_method(client, method_name, **method_kwargs) + passed += 1 + + except AssertionError: + print_error(f'fail! method \'{method_name}\' not found.') + failed += 1 + + total += 1 + + print(f'total: {total}, passed: {passed}, failed: {failed}') + + @cli.command() @click.argument('broker', nargs=1, required=True) @click.pass_obj @@ -47,90 +172,10 @@ def brokercheck(config, broker): Test broker apis for completeness. ''' - OK = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - - def print_ok(s: str, **kwargs): - print(OK + s + ENDC, **kwargs) - - def print_error(s: str, **kwargs): - print(FAIL + s + ENDC, **kwargs) - - def get_method(client, meth_name: str): - print(f'checking client for method \'{meth_name}\'...', end='', flush=True) - method = getattr(client, meth_name, None) - assert method - print_ok('found!.') - return method - - async def run_test(broker_name: str): - brokermod = get_brokermod(broker_name) - total = 0 - passed = 0 - failed = 0 - - print(f'getting client...', end='', flush=True) - if not hasattr(brokermod, 'get_client'): - print_error('fail! no \'get_client\' context manager found.') - return - - async with brokermod.get_client(is_brokercheck=True) as client: - print_ok(f'done! inside client context.') - - # check for methods present on brokermod - method_list = [ - 'backfill_bars', - 'get_client', - 'trades_dialogue', - 'open_history_client', - 'open_symbol_search', - 'stream_quotes', - - ] - - for method in method_list: - print( - f'checking brokermod for method \'{method}\'...', - end='', flush=True) - if not hasattr(brokermod, method): - print_error(f'fail! method \'{method}\' not found.') - failed += 1 - else: - print_ok('done!') - passed += 1 - - total += 1 - - # check for methods present con brokermod.Client and their - # results - - # for private methods only check is present - method_list = [ - 'get_balances', - 'get_assets', - 'get_trades', - 'get_xfers', - 'submit_limit', - 'submit_cancel', - 'cache_symbols', - 'search_symbols', - 'bars' - ] - - for method_name in method_list: - try: - get_method(client, method_name) - passed += 1 - - except AssertionError: - print_error(f'fail! method \'{method_name}\' not found.') - failed += 1 - - total += 1 - - print(f'total: {total}, passed: {passed}, failed: {failed}') + async def bcheck_main(): + async with maybe_spawn_brokerd(broker) as portal: + await portal.run(run_test, broker) + await portal.cancel_actor() trio.run(run_test, broker) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 15f90e29..ab21b75b 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -26,7 +26,7 @@ from contextlib import asynccontextmanager as acm, AsyncExitStack from itertools import count from functools import partial from datetime import datetime -from typing import Any, List, Dict, Optional, Iterable, Callable +from typing import Any, Optional, Iterable, Callable import pendulum import asks @@ -91,14 +91,14 @@ class JSONRPCResult(Struct): class KLinesResult(Struct): - close: List[float] - cost: List[float] - high: List[float] - low: List[float] - open: List[float] + close: list[float] + cost: list[float] + high: list[float] + low: list[float] + open: list[float] status: str - ticks: List[int] - volume: List[float] + ticks: list[int] + volume: list[float] class Trade(Struct): trade_seq: int @@ -116,7 +116,7 @@ class Trade(Struct): amount: float class LastTradesResult(Struct): - trades: List[Trade] + trades: list[Trade] has_more: bool @@ -414,37 +414,28 @@ async def get_client( is_brokercheck: bool = False ) -> Client: - if is_brokercheck: - yield Client - return - async with ( trio.open_nursery() as n, open_autorecon_ws(_testnet_ws_url) as ws ): _rpc_id: Iterable = count(0) - _rpc_results: Dict[int, Dict] = {} + _rpc_results: dict[int, dict] = {} _expiry_time: int = float('inf') _access_token: Optional[str] = None _refresh_token: Optional[str] = None - def _next_json_body(method: str, params: Dict): - """get the typical json rpc 2.0 msg body and increment the req id + async def json_rpc(method: str, params: dict) -> dict: + """perform a json rpc call and wait for the result, raise exception in + case of error field present on response """ - return { + msg = { 'jsonrpc': '2.0', 'id': next(_rpc_id), 'method': method, 'params': params } - - async def json_rpc(method: str, params: Dict) -> Dict: - """perform a json rpc call and wait for the result, raise exception in - case of error field present on response - """ - msg = _next_json_body(method, params) _id = msg['id'] _rpc_results[_id] = { @@ -546,6 +537,7 @@ async def get_client( await client.cache_symbols() yield client + n.cancel_scope.cancel() @acm @@ -618,7 +610,8 @@ async def aio_price_feed_relay( @acm async def open_price_feed( - instrument: str) -> trio.abc.ReceiveStream: + instrument: str +) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler() as fh: async with to_asyncio.open_channel_from( partial( @@ -684,7 +677,7 @@ async def aio_order_feed_relay( @acm async def open_order_feed( - instrument: List[str] + instrument: list[str] ) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler() as fh: async with to_asyncio.open_channel_from( diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index a45861f7..714ef61b 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -20,7 +20,7 @@ Deribit backend. ''' from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import Any, Optional, List, Callable +from typing import Any, Optional, Callable import time import trio