Add brokercheck test and got deribit to dump l1 and trades to console
							parent
							
								
									7acc4e3208
								
							
						
					
					
						commit
						6df181c233
					
				| 
						 | 
				
			
			@ -39,6 +39,97 @@ _config_dir = click.get_app_dir('piker')
 | 
			
		|||
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.option('--loglevel', '-l', default='info', help='Logging level')
 | 
			
		||||
@click.argument('broker', nargs=1, required=True)
 | 
			
		||||
@click.pass_obj
 | 
			
		||||
def brokercheck(config, loglevel, broker):
 | 
			
		||||
    '''
 | 
			
		||||
    Test broker apis for completeness.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    log = get_console_log(loglevel)
 | 
			
		||||
 | 
			
		||||
    async def run_method(client, meth_name: str, **kwargs):
 | 
			
		||||
        log.info(f'checking client for method \'{meth_name}\'...')
 | 
			
		||||
        method = getattr(client, meth_name, None)
 | 
			
		||||
        assert method
 | 
			
		||||
        log.info('found!, running...')
 | 
			
		||||
        result = await method(**kwargs)
 | 
			
		||||
        log.info(f'done! result: {type(result)}')
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    async def run_test(broker_name: str):
 | 
			
		||||
        brokermod = get_brokermod(broker_name)
 | 
			
		||||
        total = 0
 | 
			
		||||
        passed = 0
 | 
			
		||||
        failed = 0
 | 
			
		||||
        
 | 
			
		||||
        log.info(f'getting client...')
 | 
			
		||||
        if not hasattr(brokermod, 'get_client'):
 | 
			
		||||
            log.error('fail! no \'get_client\' context manager found.')
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        async with brokermod.get_client() as client:
 | 
			
		||||
            log.info(f'done! inside client context.')
 | 
			
		||||
 | 
			
		||||
            # check for methods present on brokermod
 | 
			
		||||
            method_list = [
 | 
			
		||||
                'stream_messages',
 | 
			
		||||
                'open_history_client',
 | 
			
		||||
                'backfill_bars',
 | 
			
		||||
                'stream_quotes',
 | 
			
		||||
                'open_symbol_search'
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
            for method in method_list:
 | 
			
		||||
                log.info(
 | 
			
		||||
                    f'checking brokermod for method \'{method}\'...')
 | 
			
		||||
                if not hasattr(brokermod, method):
 | 
			
		||||
                    log.error(f'fail! method \'{method}\' not found.')
 | 
			
		||||
                    failed += 1
 | 
			
		||||
                else:
 | 
			
		||||
                    log.info('done!')
 | 
			
		||||
                    passed += 1
 | 
			
		||||
 | 
			
		||||
                total += 1
 | 
			
		||||
 | 
			
		||||
            # check for methods present con brokermod.Client and their
 | 
			
		||||
            # results
 | 
			
		||||
 | 
			
		||||
            syms = await run_method(client, 'symbol_info')
 | 
			
		||||
            total += 1
 | 
			
		||||
            
 | 
			
		||||
            if len(syms) == 0:
 | 
			
		||||
                raise BaseException('Empty Symbol list?')
 | 
			
		||||
 | 
			
		||||
            passed += 1
 | 
			
		||||
 | 
			
		||||
            first_sym = tuple(syms.keys())[0]
 | 
			
		||||
 | 
			
		||||
            method_list = [
 | 
			
		||||
                ('cache_symbols', {}),
 | 
			
		||||
                ('search_symbols', {'pattern': first_sym[:-1]}),
 | 
			
		||||
                ('bars', {'symbol': first_sym})
 | 
			
		||||
            ]
 | 
			
		||||
            
 | 
			
		||||
            for method_name, method_kwargs in method_list:
 | 
			
		||||
                try:
 | 
			
		||||
                    await run_method(client, method_name, **method_kwargs)
 | 
			
		||||
                    passed += 1
 | 
			
		||||
 | 
			
		||||
                except AssertionError:
 | 
			
		||||
                    log.error(f'fail! method \'{method_name}\' not found.')
 | 
			
		||||
                    failed += 1
 | 
			
		||||
 | 
			
		||||
                total += 1
 | 
			
		||||
 | 
			
		||||
            log.info(f'total: {total}, passed: {passed}, failed: {failed}')
 | 
			
		||||
 | 
			
		||||
    trio.run(run_test, broker)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cli.command()
 | 
			
		||||
@click.option('--keys', '-k', multiple=True,
 | 
			
		||||
              help='Return results only for these keys')
 | 
			
		||||
| 
						 | 
				
			
			@ -193,6 +284,8 @@ def contracts(ctx, loglevel, broker, symbol, ids):
 | 
			
		|||
    brokermod = get_brokermod(broker)
 | 
			
		||||
    get_console_log(loglevel)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    contracts = trio.run(partial(core.contracts, brokermod, symbol))
 | 
			
		||||
    if not ids:
 | 
			
		||||
        # just print out expiry dates which can be used with
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,10 +18,11 @@
 | 
			
		|||
Deribit backend
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
import asyncio
 | 
			
		||||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any, Union, Optional,
 | 
			
		||||
    Any, Union, Optional, List,
 | 
			
		||||
    AsyncGenerator, Callable,
 | 
			
		||||
)
 | 
			
		||||
import time
 | 
			
		||||
| 
						 | 
				
			
			@ -33,10 +34,12 @@ import asks
 | 
			
		|||
from fuzzywuzzy import process as fuzzy
 | 
			
		||||
import numpy as np
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor import to_asyncio
 | 
			
		||||
from pydantic.dataclasses import dataclass
 | 
			
		||||
from pydantic import BaseModel
 | 
			
		||||
import wsproto
 | 
			
		||||
 | 
			
		||||
from .. import config
 | 
			
		||||
from .._cacheables import open_cached_client
 | 
			
		||||
from ._util import resproc, SymbolNotFound
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
| 
						 | 
				
			
			@ -50,7 +53,31 @@ from cryptofeed.callback import (
 | 
			
		|||
    L1BookCallback,
 | 
			
		||||
    TradeCallback
 | 
			
		||||
)
 | 
			
		||||
from cryptofeed.defines import DERIBIT, L1_BOOK, TRADES 
 | 
			
		||||
from cryptofeed.defines import (
 | 
			
		||||
    DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
 | 
			
		||||
)
 | 
			
		||||
from cryptofeed.symbols import Symbol
 | 
			
		||||
 | 
			
		||||
_spawn_kwargs = {
 | 
			
		||||
    'infect_asyncio': True,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_config() -> dict[str, Any]:
 | 
			
		||||
 | 
			
		||||
    conf, path = config.load()
 | 
			
		||||
 | 
			
		||||
    section = conf.get('deribit')
 | 
			
		||||
 | 
			
		||||
    if section is None:
 | 
			
		||||
        log.warning(f'No config section found for deribit in {path}')
 | 
			
		||||
        return {}
 | 
			
		||||
 | 
			
		||||
    conf['log'] = {}
 | 
			
		||||
    conf['log']['filename'] = 'feedhandler.log'
 | 
			
		||||
    conf['log']['level'] = 'WARNING'
 | 
			
		||||
 | 
			
		||||
    return conf 
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
| 
						 | 
				
			
			@ -126,16 +153,13 @@ class Client:
 | 
			
		|||
 | 
			
		||||
        # will retrieve all symbols by default
 | 
			
		||||
        params = {
 | 
			
		||||
            'currency': currency.to_upper(),
 | 
			
		||||
            'currency': currency.upper(),
 | 
			
		||||
            'kind': kind,
 | 
			
		||||
            'expired': expired
 | 
			
		||||
            'expired': str(expired).lower()
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        resp = await self._api(
 | 
			
		||||
            'get_instrument', params=params)
 | 
			
		||||
 | 
			
		||||
        if 'result' in resp:
 | 
			
		||||
            raise SymbolNotFound
 | 
			
		||||
            'get_instruments', params=params)
 | 
			
		||||
 | 
			
		||||
        results = resp['result']
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -246,20 +270,43 @@ async def open_aio_cryptofeed_relay(
 | 
			
		|||
    instruments: List[str] = []
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    async def trade_cb(feed, instrument, data: dict, receipt_timestamp):
 | 
			
		||||
    conf = get_config()
 | 
			
		||||
    
 | 
			
		||||
    def format_sym(name: str) -> str:
 | 
			
		||||
        base, expiry_date, strike_price, option_type = tuple(
 | 
			
		||||
            name.upper().split('-'))
 | 
			
		||||
 | 
			
		||||
        quote = base
 | 
			
		||||
 | 
			
		||||
        if option_type == 'P':
 | 
			
		||||
            option_type = PUT 
 | 
			
		||||
        elif option_type == 'C':
 | 
			
		||||
            option_type = CALL
 | 
			
		||||
        else:
 | 
			
		||||
            raise BaseException("Instrument name must end in 'c' for calls or 'p' for puts")
 | 
			
		||||
 | 
			
		||||
        return Symbol(
 | 
			
		||||
            base, quote,
 | 
			
		||||
            type=OPTION,
 | 
			
		||||
            strike_price=strike_price,
 | 
			
		||||
            option_type=option_type,
 | 
			
		||||
            expiry_date=expiry_date.upper()).normalized 
 | 
			
		||||
 | 
			
		||||
    instruments = [format_sym(i) for i in instruments]
 | 
			
		||||
 | 
			
		||||
    async def trade_cb(data: dict, receipt_timestamp):
 | 
			
		||||
        to_trio.send_nowait({
 | 
			
		||||
            'type': 'trade',
 | 
			
		||||
            instrument: data,
 | 
			
		||||
            data.symbol: data.to_dict(),
 | 
			
		||||
            'receipt': receipt_timestamp}) 
 | 
			
		||||
 | 
			
		||||
    async def l1_book_cb(feed, instrument, data: dict, receipt_timestamp):
 | 
			
		||||
    async def l1_book_cb(data: dict, receipt_timestamp):
 | 
			
		||||
        to_trio.send_nowait({
 | 
			
		||||
            'type': 'l1_book',
 | 
			
		||||
            instrument: data,
 | 
			
		||||
            'receipt': receipt_timestamp}) 
 | 
			
		||||
            data.symbol: data.to_dict(),
 | 
			
		||||
            'receipt': receipt_timestamp})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    fh = FeedHandler()
 | 
			
		||||
    fh = FeedHandler(config=conf)
 | 
			
		||||
    fh.run(start_loop=False)
 | 
			
		||||
 | 
			
		||||
    fh.add_feed(
 | 
			
		||||
| 
						 | 
				
			
			@ -281,7 +328,7 @@ async def open_aio_cryptofeed_relay(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_cryptofeeds():
 | 
			
		||||
async def open_cryptofeeds(instruments: List[str]):
 | 
			
		||||
 | 
			
		||||
    # try:
 | 
			
		||||
    event_table = {}
 | 
			
		||||
| 
						 | 
				
			
			@ -290,7 +337,7 @@ async def open_cryptofeeds():
 | 
			
		|||
        to_asyncio.open_channel_from(
 | 
			
		||||
            open_aio_cryptofeed_relay,
 | 
			
		||||
            event_consumers=event_table,
 | 
			
		||||
            instruments=['BTC-10JUN22-30000-C']
 | 
			
		||||
            instruments=instruments
 | 
			
		||||
        ) as (first, chan),
 | 
			
		||||
        trio.open_nursery() as n,
 | 
			
		||||
    ):
 | 
			
		||||
| 
						 | 
				
			
			@ -303,7 +350,7 @@ async def open_cryptofeeds():
 | 
			
		|||
 | 
			
		||||
        n.start_soon(relay_events)
 | 
			
		||||
 | 
			
		||||
        yield None
 | 
			
		||||
        yield chan 
 | 
			
		||||
        
 | 
			
		||||
        await chan.send(None)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -372,12 +419,15 @@ async def stream_quotes(
 | 
			
		|||
    async with (
 | 
			
		||||
        open_cached_client('deribit') as client,
 | 
			
		||||
        send_chan as send_chan,
 | 
			
		||||
        open_cryptofeeds(symbols) as feed_chan
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        # keep client cached for real-time section
 | 
			
		||||
        cache = await client.cache_symbols()
 | 
			
		||||
 | 
			
		||||
        breakpoint() 
 | 
			
		||||
        async with feed_chan.subscribe() as msg_stream:
 | 
			
		||||
            async for msg in msg_stream:
 | 
			
		||||
                print(msg)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue