diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index b614a4fd..f3e89a87 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -39,6 +39,97 @@ _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') +@cli.command() +@click.option('--loglevel', '-l', default='info', help='Logging level') +@click.argument('broker', nargs=1, required=True) +@click.pass_obj +def brokercheck(config, loglevel, broker): + ''' + Test broker apis for completeness. + + ''' + log = get_console_log(loglevel) + + async def run_method(client, meth_name: str, **kwargs): + log.info(f'checking client for method \'{meth_name}\'...') + method = getattr(client, meth_name, None) + assert method + log.info('found!, running...') + result = await method(**kwargs) + log.info(f'done! result: {type(result)}') + return result + + async def run_test(broker_name: str): + brokermod = get_brokermod(broker_name) + total = 0 + passed = 0 + failed = 0 + + log.info(f'getting client...') + if not hasattr(brokermod, 'get_client'): + log.error('fail! no \'get_client\' context manager found.') + return + + async with brokermod.get_client() as client: + log.info(f'done! inside client context.') + + # check for methods present on brokermod + method_list = [ + 'stream_messages', + 'open_history_client', + 'backfill_bars', + 'stream_quotes', + 'open_symbol_search' + ] + + for method in method_list: + log.info( + f'checking brokermod for method \'{method}\'...') + if not hasattr(brokermod, method): + log.error(f'fail! method \'{method}\' not found.') + failed += 1 + else: + log.info('done!') + passed += 1 + + total += 1 + + # check for methods present con brokermod.Client and their + # results + + syms = await run_method(client, 'symbol_info') + total += 1 + + if len(syms) == 0: + raise BaseException('Empty Symbol list?') + + passed += 1 + + first_sym = tuple(syms.keys())[0] + + method_list = [ + ('cache_symbols', {}), + ('search_symbols', {'pattern': first_sym[:-1]}), + ('bars', {'symbol': first_sym}) + ] + + for method_name, method_kwargs in method_list: + try: + await run_method(client, method_name, **method_kwargs) + passed += 1 + + except AssertionError: + log.error(f'fail! method \'{method_name}\' not found.') + failed += 1 + + total += 1 + + log.info(f'total: {total}, passed: {passed}, failed: {failed}') + + trio.run(run_test, broker) + + + @cli.command() @click.option('--keys', '-k', multiple=True, help='Return results only for these keys') @@ -193,6 +284,8 @@ def contracts(ctx, loglevel, broker, symbol, ids): brokermod = get_brokermod(broker) get_console_log(loglevel) + + contracts = trio.run(partial(core.contracts, brokermod, symbol)) if not ids: # just print out expiry dates which can be used with diff --git a/piker/brokers/deribit.py b/piker/brokers/deribit.py index cbd48369..3f379313 100644 --- a/piker/brokers/deribit.py +++ b/piker/brokers/deribit.py @@ -18,10 +18,11 @@ Deribit backend """ +import asyncio from contextlib import asynccontextmanager as acm from datetime import datetime from typing import ( - Any, Union, Optional, + Any, Union, Optional, List, AsyncGenerator, Callable, ) import time @@ -33,10 +34,12 @@ import asks from fuzzywuzzy import process as fuzzy import numpy as np import tractor +from tractor import to_asyncio from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto +from .. import config from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log @@ -50,7 +53,31 @@ from cryptofeed.callback import ( L1BookCallback, TradeCallback ) -from cryptofeed.defines import DERIBIT, L1_BOOK, TRADES +from cryptofeed.defines import ( + DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT +) +from cryptofeed.symbols import Symbol + +_spawn_kwargs = { + 'infect_asyncio': True, +} + + +def get_config() -> dict[str, Any]: + + conf, path = config.load() + + section = conf.get('deribit') + + if section is None: + log.warning(f'No config section found for deribit in {path}') + return {} + + conf['log'] = {} + conf['log']['filename'] = 'feedhandler.log' + conf['log']['level'] = 'WARNING' + + return conf log = get_logger(__name__) @@ -126,16 +153,13 @@ class Client: # will retrieve all symbols by default params = { - 'currency': currency.to_upper(), + 'currency': currency.upper(), 'kind': kind, - 'expired': expired + 'expired': str(expired).lower() } resp = await self._api( - 'get_instrument', params=params) - - if 'result' in resp: - raise SymbolNotFound + 'get_instruments', params=params) results = resp['result'] @@ -246,20 +270,43 @@ async def open_aio_cryptofeed_relay( instruments: List[str] = [] ) -> None: - async def trade_cb(feed, instrument, data: dict, receipt_timestamp): + conf = get_config() + + def format_sym(name: str) -> str: + base, expiry_date, strike_price, option_type = tuple( + name.upper().split('-')) + + quote = base + + if option_type == 'P': + option_type = PUT + elif option_type == 'C': + option_type = CALL + else: + raise BaseException("Instrument name must end in 'c' for calls or 'p' for puts") + + return Symbol( + base, quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date.upper()).normalized + + instruments = [format_sym(i) for i in instruments] + + async def trade_cb(data: dict, receipt_timestamp): to_trio.send_nowait({ 'type': 'trade', - instrument: data, + data.symbol: data.to_dict(), 'receipt': receipt_timestamp}) - async def l1_book_cb(feed, instrument, data: dict, receipt_timestamp): + async def l1_book_cb(data: dict, receipt_timestamp): to_trio.send_nowait({ 'type': 'l1_book', - instrument: data, - 'receipt': receipt_timestamp}) + data.symbol: data.to_dict(), + 'receipt': receipt_timestamp}) - - fh = FeedHandler() + fh = FeedHandler(config=conf) fh.run(start_loop=False) fh.add_feed( @@ -281,7 +328,7 @@ async def open_aio_cryptofeed_relay( @acm -async def open_cryptofeeds(): +async def open_cryptofeeds(instruments: List[str]): # try: event_table = {} @@ -290,7 +337,7 @@ async def open_cryptofeeds(): to_asyncio.open_channel_from( open_aio_cryptofeed_relay, event_consumers=event_table, - instruments=['BTC-10JUN22-30000-C'] + instruments=instruments ) as (first, chan), trio.open_nursery() as n, ): @@ -303,7 +350,7 @@ async def open_cryptofeeds(): n.start_soon(relay_events) - yield None + yield chan await chan.send(None) @@ -372,12 +419,15 @@ async def stream_quotes( async with ( open_cached_client('deribit') as client, send_chan as send_chan, + open_cryptofeeds(symbols) as feed_chan ): # keep client cached for real-time section cache = await client.cache_symbols() - breakpoint() + async with feed_chan.subscribe() as msg_stream: + async for msg in msg_stream: + print(msg) @tractor.context diff --git a/setup.py b/setup.py index 6e58ec6e..561c742d 100755 --- a/setup.py +++ b/setup.py @@ -61,6 +61,7 @@ setup( # brokers 'asks==2.4.8', 'ib_insync', + 'cryptofeed', # numerics 'pendulum', # easier datetimes