Compare commits
	
		
			No commits in common. "ab9a6f923629cbf0b6e5b7a44717202b0c69bf16" and "10121793f258b0451b9e20107bc21cf220837adc" have entirely different histories. 
		
	
	
		
			ab9a6f9236
			...
			10121793f2
		
	
		| 
						 | 
				
			
			@ -28,8 +28,6 @@ from decimal import (
 | 
			
		|||
    Decimal,
 | 
			
		||||
)
 | 
			
		||||
from functools import partial
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
import time
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
| 
						 | 
				
			
			@ -39,6 +37,8 @@ from typing import (
 | 
			
		|||
 | 
			
		||||
from pendulum import now
 | 
			
		||||
import trio
 | 
			
		||||
from trio_typing import TaskStatus
 | 
			
		||||
from rapidfuzz import process as fuzzy
 | 
			
		||||
import numpy as np
 | 
			
		||||
from tractor.trionics import (
 | 
			
		||||
    broadcast_receiver,
 | 
			
		||||
| 
						 | 
				
			
			@ -52,16 +52,11 @@ from cryptofeed import FeedHandler
 | 
			
		|||
from cryptofeed.defines import (
 | 
			
		||||
    DERIBIT,
 | 
			
		||||
    L1_BOOK, TRADES,
 | 
			
		||||
    OPTION, CALL, PUT,
 | 
			
		||||
    OPEN_INTEREST,
 | 
			
		||||
    OPTION, CALL, PUT
 | 
			
		||||
)
 | 
			
		||||
from cryptofeed.symbols import Symbol
 | 
			
		||||
from cryptofeed.types import (
 | 
			
		||||
    L1Book,
 | 
			
		||||
    Trade,
 | 
			
		||||
    OpenInterest,
 | 
			
		||||
)
 | 
			
		||||
from piker.brokers import SymbolNotFound
 | 
			
		||||
# types for managing the cb callbacks.
 | 
			
		||||
# from cryptofeed.types import L1Book
 | 
			
		||||
from .venues import (
 | 
			
		||||
    _ws_url,
 | 
			
		||||
    MarketType,
 | 
			
		||||
| 
						 | 
				
			
			@ -69,7 +64,9 @@ from .venues import (
 | 
			
		|||
    Pair,
 | 
			
		||||
    OptionPair,
 | 
			
		||||
    JSONRPCResult,
 | 
			
		||||
    JSONRPCChannel,
 | 
			
		||||
    KLinesResult,
 | 
			
		||||
    Trade,
 | 
			
		||||
    LastTradesResult,
 | 
			
		||||
)
 | 
			
		||||
from piker.accounting import (
 | 
			
		||||
| 
						 | 
				
			
			@ -80,7 +77,7 @@ from piker.accounting import (
 | 
			
		|||
from piker.data import (
 | 
			
		||||
    def_iohlcv_fields,
 | 
			
		||||
    match_from_pairs,
 | 
			
		||||
    # Struct,
 | 
			
		||||
    Struct,
 | 
			
		||||
)
 | 
			
		||||
from piker.data._web_bs import (
 | 
			
		||||
    open_jsonrpc_session
 | 
			
		||||
| 
						 | 
				
			
			@ -99,21 +96,9 @@ _spawn_kwargs = {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def deribit_timestamp(when: datetime) -> int:
 | 
			
		||||
    '''
 | 
			
		||||
    Convert conventional epoch timestamp, in secs, to unixtime in
 | 
			
		||||
    milliseconds.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    return int(
 | 
			
		||||
        (when.timestamp() * 1000)
 | 
			
		||||
        +
 | 
			
		||||
        (when.microsecond / 1000)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_timestamp_int(expiry_date: str) -> int:
 | 
			
		||||
    return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
 | 
			
		||||
# convert datetime obj timestamp to unixtime in milliseconds
 | 
			
		||||
def deribit_timestamp(when):
 | 
			
		||||
    return int((when.timestamp() * 1000) + (when.microsecond / 1000))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def str_to_cb_sym(name: str) -> Symbol:
 | 
			
		||||
| 
						 | 
				
			
			@ -122,40 +107,32 @@ def str_to_cb_sym(name: str) -> Symbol:
 | 
			
		|||
    quote = base
 | 
			
		||||
 | 
			
		||||
    if option_type == 'put':
 | 
			
		||||
        option_type = PUT
 | 
			
		||||
    elif option_type == 'call':
 | 
			
		||||
        option_type = PUT 
 | 
			
		||||
    elif option_type  == 'call':
 | 
			
		||||
        option_type = CALL
 | 
			
		||||
    else:
 | 
			
		||||
        raise Exception("Couldn\'t parse option type")
 | 
			
		||||
 | 
			
		||||
    new_expiry_date: int = get_timestamp_int(
 | 
			
		||||
        get_values_from_cb_normalized_date(expiry_date)
 | 
			
		||||
    )
 | 
			
		||||
    new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
 | 
			
		||||
 | 
			
		||||
    return Symbol(
 | 
			
		||||
        base=base,
 | 
			
		||||
        quote=quote,
 | 
			
		||||
        type=OPTION,
 | 
			
		||||
        strike_price=strike_price,
 | 
			
		||||
        option_type=option_type,
 | 
			
		||||
        expiry_date=new_expiry_date
 | 
			
		||||
    )
 | 
			
		||||
        expiry_date=new_expiry_date)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
 | 
			
		||||
    (
 | 
			
		||||
        base,
 | 
			
		||||
        expiry_date,
 | 
			
		||||
        strike_price,
 | 
			
		||||
        option_type,
 | 
			
		||||
    )= tuple(
 | 
			
		||||
    base, expiry_date, strike_price, option_type = tuple(
 | 
			
		||||
        name.upper().split('-'))
 | 
			
		||||
 | 
			
		||||
    new_expiry_date = get_timestamp_int(expiry_date)
 | 
			
		||||
    quote: str = base
 | 
			
		||||
    quote = base
 | 
			
		||||
 | 
			
		||||
    if option_type == 'P' or option_type == 'PUT':
 | 
			
		||||
        option_type = PUT
 | 
			
		||||
    elif option_type == 'C' or option_type == 'CALL':
 | 
			
		||||
    if option_type == 'P':
 | 
			
		||||
        option_type = PUT 
 | 
			
		||||
    elif option_type == 'C':
 | 
			
		||||
        option_type = CALL
 | 
			
		||||
    else:
 | 
			
		||||
        raise Exception("Couldn\'t parse option type")
 | 
			
		||||
| 
						 | 
				
			
			@ -166,32 +143,14 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
 | 
			
		|||
        type=OPTION,
 | 
			
		||||
        strike_price=strike_price,
 | 
			
		||||
        option_type=option_type,
 | 
			
		||||
        expiry_date=new_expiry_date
 | 
			
		||||
    )
 | 
			
		||||
        expiry_date=expiry_date)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO, instead can't we just lookup the `MktPair` directly
 | 
			
		||||
# and pass it upward to `stream_quotes()`??
 | 
			
		||||
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
 | 
			
		||||
    '''
 | 
			
		||||
    Generate our own internal `str`-repr for a `cryptofeed.Symbol`
 | 
			
		||||
    uniquely from its fields.
 | 
			
		||||
 | 
			
		||||
    This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
 | 
			
		||||
    for now i suppose..?
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
def cb_sym_to_deribit_inst(sym: Symbol):
 | 
			
		||||
    new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
 | 
			
		||||
    otype = (
 | 
			
		||||
        'C' if sym.option_type == CALL
 | 
			
		||||
        else 'P'
 | 
			
		||||
    )
 | 
			
		||||
    return (
 | 
			
		||||
        f'{sym.base}-'
 | 
			
		||||
        f'{new_expiry_date}-'
 | 
			
		||||
        f'{sym.strike_price}-'
 | 
			
		||||
        f'{otype}'
 | 
			
		||||
    )
 | 
			
		||||
    otype = 'C' if sym.option_type == CALL else 'P'
 | 
			
		||||
 | 
			
		||||
    return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
 | 
			
		||||
| 
						 | 
				
			
			@ -220,39 +179,32 @@ def get_config() -> dict[str, Any]:
 | 
			
		|||
 | 
			
		||||
    conf: dict
 | 
			
		||||
    path: Path
 | 
			
		||||
 | 
			
		||||
    conf, path = config.load(
 | 
			
		||||
        conf_name='brokers',
 | 
			
		||||
        touch_if_dne=True,
 | 
			
		||||
    )
 | 
			
		||||
    section: dict|None = conf.get('deribit')
 | 
			
		||||
    section: dict = {}
 | 
			
		||||
    section = conf.get('deribit')
 | 
			
		||||
    if section is None:
 | 
			
		||||
        raise ValueError(
 | 
			
		||||
            f'No `[deribit]` section found in\n'
 | 
			
		||||
            f'{path!r}\n\n'
 | 
			
		||||
            f'See the template config from the core repo for samples..\n'
 | 
			
		||||
            # f'<TODO put repo link here??>'
 | 
			
		||||
        )
 | 
			
		||||
        log.warning(f'No config section found for deribit in {path}')
 | 
			
		||||
        return {}
 | 
			
		||||
 | 
			
		||||
    conf_option = section.get('option', {})
 | 
			
		||||
    conf_log = conf_option.get('log', {})
 | 
			
		||||
    return {
 | 
			
		||||
        'deribit': {
 | 
			
		||||
            'key_id': conf_option['key_id'],
 | 
			
		||||
            'key_secret': conf_option['key_secret'],
 | 
			
		||||
        },
 | 
			
		||||
        'log': {
 | 
			
		||||
            'filename': conf_log['filename'],
 | 
			
		||||
            'level': conf_log['level'],
 | 
			
		||||
            'disabled': conf_log['disabled'],
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    section.clear # clear the dict to reuse it
 | 
			
		||||
    section['deribit'] = {}
 | 
			
		||||
    section['deribit']['key_id'] = conf_option.get('api_key')
 | 
			
		||||
    section['deribit']['key_secret'] = conf_option.get('api_secret')
 | 
			
		||||
 | 
			
		||||
    section['log'] = {}
 | 
			
		||||
    section['log']['filename'] = 'feedhandler.log'
 | 
			
		||||
    section['log']['level'] = 'DEBUG'
 | 
			
		||||
 | 
			
		||||
    return section
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Client:
 | 
			
		||||
    '''
 | 
			
		||||
    Hi-level interface for the jsron-RPC over websocket API.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -271,12 +223,8 @@ class Client:
 | 
			
		|||
        self._auth_ts = None
 | 
			
		||||
        self._auth_renew_ts = 5 # seconds to renew auth
 | 
			
		||||
 | 
			
		||||
    async def _json_rpc_auth_wrapper(
 | 
			
		||||
        self,
 | 
			
		||||
        *args,
 | 
			
		||||
        **kwargs,
 | 
			
		||||
    ) -> JSONRPCResult:
 | 
			
		||||
 | 
			
		||||
    async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
 | 
			
		||||
        
 | 
			
		||||
        """Background task that adquires a first access token and then will
 | 
			
		||||
        refresh the access token.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -302,6 +250,9 @@ class Client:
 | 
			
		|||
 | 
			
		||||
        return await self.json_rpc(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    async def get_balances(
 | 
			
		||||
        self,
 | 
			
		||||
        kind: str = 'option'
 | 
			
		||||
| 
						 | 
				
			
			@ -321,44 +272,28 @@ class Client:
 | 
			
		|||
 | 
			
		||||
        return balances
 | 
			
		||||
 | 
			
		||||
    async def get_currencies(
 | 
			
		||||
        self,
 | 
			
		||||
 | 
			
		||||
    ) -> list[dict]:
 | 
			
		||||
        '''
 | 
			
		||||
        Return the set of currencies for deribit.
 | 
			
		||||
        '''
 | 
			
		||||
        assets = {}
 | 
			
		||||
        resp = await self._json_rpc_auth_wrapper(
 | 
			
		||||
            'public/get_currencies',
 | 
			
		||||
            params={}
 | 
			
		||||
        )
 | 
			
		||||
        return resp.result
 | 
			
		||||
 | 
			
		||||
    async def get_assets(
 | 
			
		||||
        self,
 | 
			
		||||
        venue: str | None = None,
 | 
			
		||||
 | 
			
		||||
    ) -> dict[str, Asset]:
 | 
			
		||||
        '''
 | 
			
		||||
        Return the set of asset balances for this account
 | 
			
		||||
        by (deribit's) symbol.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        """Return the set of asset balances for this account
 | 
			
		||||
        by symbol.
 | 
			
		||||
        """
 | 
			
		||||
        assets = {}
 | 
			
		||||
        currencies = await self.get_currencies()
 | 
			
		||||
        resp = await self._json_rpc_auth_wrapper(
 | 
			
		||||
            'public/get_currencies',
 | 
			
		||||
            params={}
 | 
			
		||||
        )
 | 
			
		||||
        currencies = resp.result
 | 
			
		||||
        for currency in currencies:
 | 
			
		||||
            name: str = currency['currency']
 | 
			
		||||
            tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
 | 
			
		||||
 | 
			
		||||
            # TODO, handling of options, futures, perps etc. more
 | 
			
		||||
            # specifically with diff `.atype`s?
 | 
			
		||||
            name = currency['currency']
 | 
			
		||||
            tx_tick = digits_to_dec(currency['fee_precision']) 
 | 
			
		||||
            atype='crypto_currency'
 | 
			
		||||
            assets[name] = Asset(
 | 
			
		||||
                name=name,
 | 
			
		||||
                atype='crypto_currency',
 | 
			
		||||
                tx_tick=tx_tick,
 | 
			
		||||
            )
 | 
			
		||||
                atype=atype,
 | 
			
		||||
                tx_tick=tx_tick)
 | 
			
		||||
 | 
			
		||||
            instruments = await self.symbol_info(currency=name)
 | 
			
		||||
            for instrument in instruments:
 | 
			
		||||
| 
						 | 
				
			
			@ -366,10 +301,9 @@ class Client:
 | 
			
		|||
                assets[pair.symbol] = Asset(
 | 
			
		||||
                    name=pair.symbol,
 | 
			
		||||
                    atype=pair.venue,
 | 
			
		||||
                    tx_tick=pair.size_tick,
 | 
			
		||||
                )
 | 
			
		||||
                    tx_tick=pair.size_tick)
 | 
			
		||||
 | 
			
		||||
        return assets
 | 
			
		||||
        return assets 
 | 
			
		||||
 | 
			
		||||
    async def get_mkt_pairs(self) -> dict[str, Pair]:
 | 
			
		||||
        flat: dict[str, Pair] = {}
 | 
			
		||||
| 
						 | 
				
			
			@ -424,19 +358,6 @@ class Client:
 | 
			
		|||
            return cached_pair
 | 
			
		||||
 | 
			
		||||
        if sym:
 | 
			
		||||
            opt: OptionPair|None = pair_table.get(sym)
 | 
			
		||||
            if not opt:
 | 
			
		||||
                closest_matches: dict[str, Pair] = match_from_pairs(
 | 
			
		||||
                    pairs=pair_table,
 | 
			
		||||
                    query=sym,
 | 
			
		||||
                    score_cutoff=40,
 | 
			
		||||
                )
 | 
			
		||||
                closest_syms: list[str] = list(closest_matches.keys())
 | 
			
		||||
                raise ValueError(
 | 
			
		||||
                    f'No contract found for {sym!r}\n\n'
 | 
			
		||||
                    f'Closest {len(closest_syms)} available contracts:\n\n'
 | 
			
		||||
                    f'{pformat(closest_syms)}\n'
 | 
			
		||||
                )
 | 
			
		||||
            return pair_table[sym]
 | 
			
		||||
        else:
 | 
			
		||||
            return self._pairs
 | 
			
		||||
| 
						 | 
				
			
			@ -460,7 +381,7 @@ class Client:
 | 
			
		|||
        params: dict[str, str] = {
 | 
			
		||||
            'currency': currency.upper(),
 | 
			
		||||
            'kind': kind,
 | 
			
		||||
            'expired': expired,
 | 
			
		||||
            'expired': str(expired).lower()
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
 | 
			
		||||
| 
						 | 
				
			
			@ -468,9 +389,9 @@ class Client:
 | 
			
		|||
            params,
 | 
			
		||||
        )
 | 
			
		||||
        # convert to symbol-keyed table
 | 
			
		||||
        pair_type: Pair = PAIRTYPES[kind]
 | 
			
		||||
        pair_type: Type = PAIRTYPES[kind]
 | 
			
		||||
        results: list[dict] | None = resp.result
 | 
			
		||||
 | 
			
		||||
        
 | 
			
		||||
        instruments: dict[str, Pair] = {}
 | 
			
		||||
        for item in results:
 | 
			
		||||
            symbol=item['instrument_name'].lower()
 | 
			
		||||
| 
						 | 
				
			
			@ -506,15 +427,12 @@ class Client:
 | 
			
		|||
        mkt_pairs = await self.symbol_info()
 | 
			
		||||
 | 
			
		||||
        if not mkt_pairs:
 | 
			
		||||
            raise SymbolNotFound(
 | 
			
		||||
                f'No market pairs found!?:\n'
 | 
			
		||||
                f'{mkt_pairs}'
 | 
			
		||||
            )
 | 
			
		||||
            raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
 | 
			
		||||
 | 
			
		||||
        pairs_view_subtable: dict[str, Pair] = {}
 | 
			
		||||
 | 
			
		||||
        for instrument in mkt_pairs:
 | 
			
		||||
            pair_type: Pair|OptionPair = PAIRTYPES[venue]
 | 
			
		||||
            pair_type: Type = PAIRTYPES[venue]
 | 
			
		||||
 | 
			
		||||
            pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -562,14 +480,12 @@ class Client:
 | 
			
		|||
        if end_dt is None:
 | 
			
		||||
            end_dt = now('UTC')
 | 
			
		||||
 | 
			
		||||
        _orig_start_dt = start_dt
 | 
			
		||||
        if start_dt is None:
 | 
			
		||||
            start_dt = end_dt.start_of(
 | 
			
		||||
                'minute'
 | 
			
		||||
            ).subtract(minutes=limit)
 | 
			
		||||
                'minute').subtract(minutes=limit)
 | 
			
		||||
 | 
			
		||||
        start_time: int = deribit_timestamp(start_dt)
 | 
			
		||||
        end_time: int = deribit_timestamp(end_dt)
 | 
			
		||||
        start_time = deribit_timestamp(start_dt)
 | 
			
		||||
        end_time = deribit_timestamp(end_dt)
 | 
			
		||||
 | 
			
		||||
        # https://docs.deribit.com/#public-get_tradingview_chart_data
 | 
			
		||||
        resp = await self._json_rpc_auth_wrapper(
 | 
			
		||||
| 
						 | 
				
			
			@ -583,13 +499,9 @@ class Client:
 | 
			
		|||
 | 
			
		||||
        result = KLinesResult(**resp.result)
 | 
			
		||||
        new_bars: list[tuple] = []
 | 
			
		||||
        # if _orig_start_dt is None:
 | 
			
		||||
        # if not new_bars:
 | 
			
		||||
        #     import tractor
 | 
			
		||||
        #     await tractor.pause()
 | 
			
		||||
 | 
			
		||||
        for i in range(len(result.close)):
 | 
			
		||||
            row = [
 | 
			
		||||
 | 
			
		||||
            row = [ 
 | 
			
		||||
                (start_time + (i * (60 * 1000))) / 1000.0,  # time
 | 
			
		||||
                result.open[i],
 | 
			
		||||
                result.high[i],
 | 
			
		||||
| 
						 | 
				
			
			@ -642,7 +554,7 @@ async def get_client(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_feed_handler() -> FeedHandler:
 | 
			
		||||
async def open_feed_handler():
 | 
			
		||||
    fh = FeedHandler(config=get_config())
 | 
			
		||||
    yield fh
 | 
			
		||||
    await to_asyncio.run_task(fh.stop_async)
 | 
			
		||||
| 
						 | 
				
			
			@ -663,37 +575,43 @@ async def aio_price_feed_relay(
 | 
			
		|||
    from_trio: asyncio.Queue,
 | 
			
		||||
    to_trio: trio.abc.SendChannel,
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Relay price feed quotes from the `cryptofeed.FeedHandler` to
 | 
			
		||||
    the `piker`-side `trio.task` consumers for delivery to consumer
 | 
			
		||||
    sub-actors for various subsystems.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async def _trade(
 | 
			
		||||
        trade: Trade,  # cryptofeed, NOT ours from `.venues`!
 | 
			
		||||
        receipt_timestamp: int,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        to_trio.send_nowait(('trade', trade))
 | 
			
		||||
 | 
			
		||||
    async def _l1(
 | 
			
		||||
        book: L1Book,
 | 
			
		||||
        receipt_timestamp: int,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Relay-thru "l1 book" updates.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
 | 
			
		||||
        to_trio.send_nowait(('l1', book))
 | 
			
		||||
 | 
			
		||||
        # TODO, make this work!
 | 
			
		||||
        # -[ ] why isn't this working in `tractor.pause_from_sync()`??
 | 
			
		||||
        # breakpoint()
 | 
			
		||||
    async def _trade(data: dict, receipt_timestamp):
 | 
			
		||||
        to_trio.send_nowait(('trade', {
 | 
			
		||||
            'symbol': cb_sym_to_deribit_inst(
 | 
			
		||||
                str_to_cb_sym(data.symbol)).lower(),
 | 
			
		||||
            'last': data,
 | 
			
		||||
            'broker_ts': time.time(),
 | 
			
		||||
            'data': data.to_dict(),
 | 
			
		||||
            'receipt': receipt_timestamp
 | 
			
		||||
        }))
 | 
			
		||||
 | 
			
		||||
    async def _l1(data: dict, receipt_timestamp):
 | 
			
		||||
        to_trio.send_nowait(('l1', {
 | 
			
		||||
            'symbol': cb_sym_to_deribit_inst(
 | 
			
		||||
                str_to_cb_sym(data.symbol)).lower(),
 | 
			
		||||
            'ticks': [
 | 
			
		||||
                {
 | 
			
		||||
                    'type': 'bid',
 | 
			
		||||
                    'price': float(data.bid_price),
 | 
			
		||||
                    'size': float(data.bid_size)
 | 
			
		||||
                },
 | 
			
		||||
                {
 | 
			
		||||
                    'type': 'bsize',
 | 
			
		||||
                    'price': float(data.bid_price),
 | 
			
		||||
                    'size': float(data.bid_size)
 | 
			
		||||
                },
 | 
			
		||||
                {
 | 
			
		||||
                    'type': 'ask',
 | 
			
		||||
                    'price': float(data.ask_price),
 | 
			
		||||
                    'size': float(data.ask_size)
 | 
			
		||||
                },
 | 
			
		||||
                {
 | 
			
		||||
                    'type': 'asize',
 | 
			
		||||
                    'price': float(data.ask_price),
 | 
			
		||||
                    'size': float(data.ask_size)
 | 
			
		||||
                }
 | 
			
		||||
            ]
 | 
			
		||||
        }))
 | 
			
		||||
    sym: Symbol = piker_sym_to_cb_sym(instrument)
 | 
			
		||||
    fh.add_feed(
 | 
			
		||||
        DERIBIT,
 | 
			
		||||
| 
						 | 
				
			
			@ -707,35 +625,27 @@ async def aio_price_feed_relay(
 | 
			
		|||
    if not fh.running:
 | 
			
		||||
        fh.run(
 | 
			
		||||
            start_loop=False,
 | 
			
		||||
            install_signal_handlers=False
 | 
			
		||||
        )
 | 
			
		||||
            install_signal_handlers=False)
 | 
			
		||||
 | 
			
		||||
    # sync with trio
 | 
			
		||||
    to_trio.send_nowait(None)
 | 
			
		||||
 | 
			
		||||
    # run until cancelled
 | 
			
		||||
    await asyncio.sleep(float('inf'))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_price_feed(
 | 
			
		||||
    instrument: str
 | 
			
		||||
) -> to_asyncio.LinkedTaskChannel:
 | 
			
		||||
 | 
			
		||||
    fh: FeedHandler
 | 
			
		||||
    first: None
 | 
			
		||||
    chan: to_asyncio.LinkedTaskChannel
 | 
			
		||||
    async with (
 | 
			
		||||
        maybe_open_feed_handler() as fh,
 | 
			
		||||
        to_asyncio.open_channel_from(
 | 
			
		||||
) -> trio.abc.ReceiveStream:
 | 
			
		||||
    async with maybe_open_feed_handler() as fh:
 | 
			
		||||
        async with to_asyncio.open_channel_from(
 | 
			
		||||
            partial(
 | 
			
		||||
                aio_price_feed_relay,
 | 
			
		||||
                fh,
 | 
			
		||||
                instrument
 | 
			
		||||
            )
 | 
			
		||||
        ) as (first, chan)
 | 
			
		||||
    ):
 | 
			
		||||
        yield chan
 | 
			
		||||
        ) as (first, chan):
 | 
			
		||||
            yield chan
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
| 
						 | 
				
			
			@ -744,7 +654,6 @@ async def maybe_open_price_feed(
 | 
			
		|||
) -> trio.abc.ReceiveStream:
 | 
			
		||||
 | 
			
		||||
    # TODO: add a predicate to maybe_open_context
 | 
			
		||||
    feed: to_asyncio.LinkedTaskChannel
 | 
			
		||||
    async with maybe_open_context(
 | 
			
		||||
        acm_func=open_price_feed,
 | 
			
		||||
        kwargs={
 | 
			
		||||
| 
						 | 
				
			
			@ -759,69 +668,68 @@ async def maybe_open_price_feed(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO, move all to `.broker` submod!
 | 
			
		||||
# async def aio_order_feed_relay(
 | 
			
		||||
#     fh: FeedHandler,
 | 
			
		||||
#     instrument: Symbol,
 | 
			
		||||
#     from_trio: asyncio.Queue,
 | 
			
		||||
#     to_trio: trio.abc.SendChannel,
 | 
			
		||||
# ) -> None:
 | 
			
		||||
#     async def _fill(data: dict, receipt_timestamp):
 | 
			
		||||
#         breakpoint()
 | 
			
		||||
async def aio_order_feed_relay(
 | 
			
		||||
    fh: FeedHandler,
 | 
			
		||||
    instrument: Symbol,
 | 
			
		||||
    from_trio: asyncio.Queue,
 | 
			
		||||
    to_trio: trio.abc.SendChannel,
 | 
			
		||||
) -> None:
 | 
			
		||||
    async def _fill(data: dict, receipt_timestamp):
 | 
			
		||||
        breakpoint()
 | 
			
		||||
 | 
			
		||||
#     async def _order_info(data: dict, receipt_timestamp):
 | 
			
		||||
#         breakpoint()
 | 
			
		||||
    async def _order_info(data: dict, receipt_timestamp):
 | 
			
		||||
        breakpoint()
 | 
			
		||||
 | 
			
		||||
#     fh.add_feed(
 | 
			
		||||
#         DERIBIT,
 | 
			
		||||
#         channels=[FILLS, ORDER_INFO],
 | 
			
		||||
#         symbols=[instrument.upper()],
 | 
			
		||||
#         callbacks={
 | 
			
		||||
#             FILLS: _fill,
 | 
			
		||||
#             ORDER_INFO: _order_info,
 | 
			
		||||
#         })
 | 
			
		||||
    fh.add_feed(
 | 
			
		||||
        DERIBIT,
 | 
			
		||||
        channels=[FILLS, ORDER_INFO],
 | 
			
		||||
        symbols=[instrument.upper()],
 | 
			
		||||
        callbacks={
 | 
			
		||||
            FILLS: _fill,
 | 
			
		||||
            ORDER_INFO: _order_info,
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
#     if not fh.running:
 | 
			
		||||
#         fh.run(
 | 
			
		||||
#             start_loop=False,
 | 
			
		||||
#             install_signal_handlers=False)
 | 
			
		||||
    if not fh.running:
 | 
			
		||||
        fh.run(
 | 
			
		||||
            start_loop=False,
 | 
			
		||||
            install_signal_handlers=False)
 | 
			
		||||
 | 
			
		||||
#     # sync with trio
 | 
			
		||||
#     to_trio.send_nowait(None)
 | 
			
		||||
    # sync with trio
 | 
			
		||||
    to_trio.send_nowait(None)
 | 
			
		||||
 | 
			
		||||
#     await asyncio.sleep(float('inf'))
 | 
			
		||||
    await asyncio.sleep(float('inf'))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# @acm
 | 
			
		||||
# async def open_order_feed(
 | 
			
		||||
#     instrument: list[str]
 | 
			
		||||
# ) -> trio.abc.ReceiveStream:
 | 
			
		||||
#     async with maybe_open_feed_handler() as fh:
 | 
			
		||||
#         async with to_asyncio.open_channel_from(
 | 
			
		||||
#             partial(
 | 
			
		||||
#                 aio_order_feed_relay,
 | 
			
		||||
#                 fh,
 | 
			
		||||
#                 instrument
 | 
			
		||||
#             )
 | 
			
		||||
#         ) as (first, chan):
 | 
			
		||||
#             yield chan
 | 
			
		||||
@acm
 | 
			
		||||
async def open_order_feed(
 | 
			
		||||
    instrument: list[str]
 | 
			
		||||
) -> trio.abc.ReceiveStream:
 | 
			
		||||
    async with maybe_open_feed_handler() as fh:
 | 
			
		||||
        async with to_asyncio.open_channel_from(
 | 
			
		||||
            partial(
 | 
			
		||||
                aio_order_feed_relay,
 | 
			
		||||
                fh,
 | 
			
		||||
                instrument
 | 
			
		||||
            )
 | 
			
		||||
        ) as (first, chan):
 | 
			
		||||
            yield chan
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# @acm
 | 
			
		||||
# async def maybe_open_order_feed(
 | 
			
		||||
#     instrument: str
 | 
			
		||||
# ) -> trio.abc.ReceiveStream:
 | 
			
		||||
@acm
 | 
			
		||||
async def maybe_open_order_feed(
 | 
			
		||||
    instrument: str
 | 
			
		||||
) -> trio.abc.ReceiveStream:
 | 
			
		||||
 | 
			
		||||
#     # TODO: add a predicate to maybe_open_context
 | 
			
		||||
#     async with maybe_open_context(
 | 
			
		||||
#         acm_func=open_order_feed,
 | 
			
		||||
#         kwargs={
 | 
			
		||||
#             'instrument': instrument.split('.')[0],
 | 
			
		||||
#             'fh': fh
 | 
			
		||||
#         },
 | 
			
		||||
#         key=f'{instrument.split('.')[0]}-order',
 | 
			
		||||
#     ) as (cache_hit, feed):
 | 
			
		||||
#         if cache_hit:
 | 
			
		||||
#             yield broadcast_receiver(feed, 10)
 | 
			
		||||
#         else:
 | 
			
		||||
#             yield feed
 | 
			
		||||
    # TODO: add a predicate to maybe_open_context
 | 
			
		||||
    async with maybe_open_context(
 | 
			
		||||
        acm_func=open_order_feed,
 | 
			
		||||
        kwargs={
 | 
			
		||||
            'instrument': instrument.split('.')[0],
 | 
			
		||||
            'fh': fh
 | 
			
		||||
        },
 | 
			
		||||
        key=f'{instrument.split('.')[0]}-order',
 | 
			
		||||
    ) as (cache_hit, feed):
 | 
			
		||||
        if cache_hit:
 | 
			
		||||
            yield broadcast_receiver(feed, 10)
 | 
			
		||||
        else:
 | 
			
		||||
            yield feed
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,58 +18,56 @@
 | 
			
		|||
Deribit backend.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from typing import (
 | 
			
		||||
    # Any,
 | 
			
		||||
    # Optional,
 | 
			
		||||
    Callable,
 | 
			
		||||
)
 | 
			
		||||
# from pprint import pformat
 | 
			
		||||
from typing import Any, Optional, Callable
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
import cryptofeed
 | 
			
		||||
import trio
 | 
			
		||||
from trio_typing import TaskStatus
 | 
			
		||||
from pendulum import (
 | 
			
		||||
    from_timestamp,
 | 
			
		||||
    now,
 | 
			
		||||
)
 | 
			
		||||
from rapidfuzz import process as fuzzy
 | 
			
		||||
import numpy as np
 | 
			
		||||
import tractor
 | 
			
		||||
 | 
			
		||||
from piker.accounting import (
 | 
			
		||||
    Asset,
 | 
			
		||||
    MktPair,
 | 
			
		||||
    unpack_fqme,
 | 
			
		||||
)
 | 
			
		||||
from piker.brokers import (
 | 
			
		||||
    open_cached_client,
 | 
			
		||||
    NoData,
 | 
			
		||||
    DataUnavailable,
 | 
			
		||||
)
 | 
			
		||||
from piker._cacheables import (
 | 
			
		||||
    async_lifo_cache,
 | 
			
		||||
)
 | 
			
		||||
from piker.log import (
 | 
			
		||||
    get_logger,
 | 
			
		||||
    mk_repr,
 | 
			
		||||
)
 | 
			
		||||
from piker.log import get_logger, get_console_log
 | 
			
		||||
from piker.data import ShmArray
 | 
			
		||||
from piker.data.validate import FeedInit
 | 
			
		||||
from piker.brokers._util import (
 | 
			
		||||
    BrokerError,
 | 
			
		||||
    DataUnavailable,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from cryptofeed import FeedHandler
 | 
			
		||||
from cryptofeed.defines import (
 | 
			
		||||
    DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
 | 
			
		||||
)
 | 
			
		||||
from cryptofeed.symbols import Symbol
 | 
			
		||||
 | 
			
		||||
from .api import (
 | 
			
		||||
    Client,
 | 
			
		||||
    # get_config,
 | 
			
		||||
    piker_sym_to_cb_sym,
 | 
			
		||||
    cb_sym_to_deribit_inst,
 | 
			
		||||
    str_to_cb_sym,
 | 
			
		||||
    Client, Trade,
 | 
			
		||||
    get_config,
 | 
			
		||||
    piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
 | 
			
		||||
    maybe_open_price_feed
 | 
			
		||||
)
 | 
			
		||||
from .venues import (
 | 
			
		||||
    Pair,
 | 
			
		||||
    OptionPair,
 | 
			
		||||
    Trade,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
_spawn_kwargs = {
 | 
			
		||||
| 
						 | 
				
			
			@ -88,10 +86,6 @@ async def open_history_client(
 | 
			
		|||
    # TODO implement history getter for the new storage layer.
 | 
			
		||||
    async with open_cached_client('deribit') as client:
 | 
			
		||||
 | 
			
		||||
        pair: OptionPair = client._pairs[mkt.dst.name]
 | 
			
		||||
        # XXX NOTE, the cuckers use ms !!!
 | 
			
		||||
        creation_time_s: int = pair.creation_timestamp/1000
 | 
			
		||||
 | 
			
		||||
        async def get_ohlc(
 | 
			
		||||
            timeframe: float,
 | 
			
		||||
            end_dt: datetime | None = None,
 | 
			
		||||
| 
						 | 
				
			
			@ -111,31 +105,6 @@ async def open_history_client(
 | 
			
		|||
                end_dt=end_dt,
 | 
			
		||||
            )
 | 
			
		||||
            if len(array) == 0:
 | 
			
		||||
                if (
 | 
			
		||||
                    end_dt is None
 | 
			
		||||
                ):
 | 
			
		||||
                    raise DataUnavailable(
 | 
			
		||||
                        'No history seems to exist yet?\n\n'
 | 
			
		||||
                        f'{mkt}'
 | 
			
		||||
                    )
 | 
			
		||||
                elif (
 | 
			
		||||
                    end_dt
 | 
			
		||||
                    and
 | 
			
		||||
                    end_dt.timestamp() < creation_time_s
 | 
			
		||||
                ):
 | 
			
		||||
                    # the contract can't have history
 | 
			
		||||
                    # before it was created.
 | 
			
		||||
                    pair_type_str: str = type(pair).__name__
 | 
			
		||||
                    create_dt: datetime = from_timestamp(creation_time_s)
 | 
			
		||||
                    raise DataUnavailable(
 | 
			
		||||
                        f'No history prior to\n'
 | 
			
		||||
                        f'`{pair_type_str}.creation_timestamp: int = '
 | 
			
		||||
                        f'{pair.creation_timestamp}\n\n'
 | 
			
		||||
                        f'------ deribit sux ------\n'
 | 
			
		||||
                        f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
 | 
			
		||||
                        f'creation_time_s: {creation_time_s}\n'
 | 
			
		||||
                        f'create_dt: {create_dt}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                raise NoData(
 | 
			
		||||
                    f'No frame for {start_dt} -> {end_dt}\n'
 | 
			
		||||
                )
 | 
			
		||||
| 
						 | 
				
			
			@ -157,20 +126,14 @@ async def open_history_client(
 | 
			
		|||
 | 
			
		||||
            return array, start_dt, end_dt
 | 
			
		||||
 | 
			
		||||
        yield (
 | 
			
		||||
            get_ohlc,
 | 
			
		||||
            {  # backfill config
 | 
			
		||||
                'erlangs': 3,
 | 
			
		||||
                'rate': 3,
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
        yield get_ohlc, {'erlangs': 3, 'rate': 3}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@async_lifo_cache()
 | 
			
		||||
async def get_mkt_info(
 | 
			
		||||
    fqme: str,
 | 
			
		||||
 | 
			
		||||
) -> tuple[MktPair, Pair|OptionPair] | None:
 | 
			
		||||
) -> tuple[MktPair, Pair] | None:
 | 
			
		||||
 | 
			
		||||
    # uppercase since kraken bs_mktid is always upper
 | 
			
		||||
    if 'deribit' not in fqme.lower():
 | 
			
		||||
| 
						 | 
				
			
			@ -186,7 +149,7 @@ async def get_mkt_info(
 | 
			
		|||
    # returns, always!
 | 
			
		||||
    expiry: str = expiry.upper()
 | 
			
		||||
    venue: str = venue.upper()
 | 
			
		||||
    # venue_lower: str = venue.lower()
 | 
			
		||||
    venue_lower: str = venue.lower()
 | 
			
		||||
 | 
			
		||||
    mkt_mode: str = 'option'
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -212,88 +175,64 @@ async def get_mkt_info(
 | 
			
		|||
            price_tick=pair.price_tick,
 | 
			
		||||
            size_tick=pair.size_tick,
 | 
			
		||||
            bs_mktid=pair.symbol,
 | 
			
		||||
            expiry=pair.expiry,
 | 
			
		||||
            venue=mkt_mode,
 | 
			
		||||
            broker='deribit',
 | 
			
		||||
            _atype=mkt_mode,
 | 
			
		||||
            _fqme_without_src=True,
 | 
			
		||||
 | 
			
		||||
            # expiry=pair.expiry,
 | 
			
		||||
            # XXX TODO, currently we don't use it since it's
 | 
			
		||||
            # already "described" in the `OptionPair.symbol: str`
 | 
			
		||||
            # and if we slap in the ISO repr it's kinda hideous..
 | 
			
		||||
            # -[ ] figure out the best either std
 | 
			
		||||
        )
 | 
			
		||||
        return mkt, pair
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_quotes(
 | 
			
		||||
 | 
			
		||||
    send_chan: trio.abc.SendChannel,
 | 
			
		||||
    symbols: list[str],
 | 
			
		||||
    feed_is_live: trio.Event,
 | 
			
		||||
    loglevel: str = None,
 | 
			
		||||
 | 
			
		||||
    # startup sync
 | 
			
		||||
    task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Open a live quote stream for the market set defined by `symbols`.
 | 
			
		||||
    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
 | 
			
		||||
    Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
 | 
			
		||||
    task and relays through L1 and `Trade` msgs here to our `trio.Task`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    sym = symbols[0].split('.')[0]
 | 
			
		||||
    init_msgs: list[FeedInit] = []
 | 
			
		||||
 | 
			
		||||
    # multiline nested `dict` formatter (since rn quote-msgs are
 | 
			
		||||
    # just that).
 | 
			
		||||
    pfmt: Callable[[str], str] = mk_repr(
 | 
			
		||||
        # so we can see `deribit`'s delightfully mega-long bs fields..
 | 
			
		||||
        maxstring=100,
 | 
			
		||||
    )
 | 
			
		||||
    init_msgs: list[FeedInit] = []
 | 
			
		||||
 | 
			
		||||
    async with (
 | 
			
		||||
        open_cached_client('deribit') as client,
 | 
			
		||||
        send_chan as send_chan
 | 
			
		||||
    ):
 | 
			
		||||
        mkt: MktPair
 | 
			
		||||
        pair: Pair
 | 
			
		||||
 | 
			
		||||
        mkt, pair = await get_mkt_info(sym)
 | 
			
		||||
 | 
			
		||||
        # build out init msgs according to latest spec
 | 
			
		||||
        init_msgs.append(
 | 
			
		||||
            FeedInit(
 | 
			
		||||
                mkt_info=mkt,
 | 
			
		||||
            )
 | 
			
		||||
            FeedInit(mkt_info=mkt)
 | 
			
		||||
        )
 | 
			
		||||
        # build `cryptofeed` feed-handle
 | 
			
		||||
        cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
 | 
			
		||||
        nsym = piker_sym_to_cb_sym(sym)
 | 
			
		||||
 | 
			
		||||
        from_cf: tractor.to_asyncio.LinkedTaskChannel
 | 
			
		||||
        async with maybe_open_price_feed(sym) as from_cf:
 | 
			
		||||
        async with maybe_open_price_feed(sym) as stream:
 | 
			
		||||
 | 
			
		||||
            # load the "last trades" summary
 | 
			
		||||
            last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
 | 
			
		||||
                cb_sym_to_deribit_inst(cf_sym),
 | 
			
		||||
                count=1,
 | 
			
		||||
            )
 | 
			
		||||
            last_trades: list[Trade] = last_trades_res.trades
 | 
			
		||||
            cache = client._pairs
 | 
			
		||||
 | 
			
		||||
            # TODO, do we even need this or will the above always
 | 
			
		||||
            # work?
 | 
			
		||||
            # if not last_trades:
 | 
			
		||||
            #     await tractor.pause()
 | 
			
		||||
            #     async for typ, quote in from_cf:
 | 
			
		||||
            #         if typ == 'trade':
 | 
			
		||||
            #             last_trade = Trade(**(quote['data']))
 | 
			
		||||
            #             break
 | 
			
		||||
            last_trades = (await client.last_trades(
 | 
			
		||||
                cb_sym_to_deribit_inst(nsym), count=1)).trades
 | 
			
		||||
 | 
			
		||||
            # else:
 | 
			
		||||
            last_trade = Trade(
 | 
			
		||||
                **(last_trades[0])
 | 
			
		||||
            )
 | 
			
		||||
            if len(last_trades) == 0:
 | 
			
		||||
                last_trade = None
 | 
			
		||||
                async for typ, quote in stream:
 | 
			
		||||
                    if typ == 'trade':
 | 
			
		||||
                        last_trade = Trade(**(quote['data']))
 | 
			
		||||
                        break
 | 
			
		||||
 | 
			
		||||
            first_quote: dict = {
 | 
			
		||||
            else:
 | 
			
		||||
                last_trade = Trade(**(last_trades[0]))
 | 
			
		||||
 | 
			
		||||
            first_quote = {
 | 
			
		||||
                'symbol': sym,
 | 
			
		||||
                'last': last_trade.price,
 | 
			
		||||
                'brokerd_ts': last_trade.timestamp,
 | 
			
		||||
| 
						 | 
				
			
			@ -304,84 +243,13 @@ async def stream_quotes(
 | 
			
		|||
                    'broker_ts': last_trade.timestamp
 | 
			
		||||
                }]
 | 
			
		||||
            }
 | 
			
		||||
            task_status.started((
 | 
			
		||||
                init_msgs,
 | 
			
		||||
                first_quote,
 | 
			
		||||
            ))
 | 
			
		||||
            task_status.started((init_msgs,  first_quote))
 | 
			
		||||
 | 
			
		||||
            feed_is_live.set()
 | 
			
		||||
 | 
			
		||||
            # NOTE XXX, static for now!
 | 
			
		||||
            # => since this only handles ONE mkt feed at a time we
 | 
			
		||||
            # don't need a lookup table to map interleaved quotes
 | 
			
		||||
            # from multiple possible mkt-pairs
 | 
			
		||||
            topic: str = mkt.bs_fqme
 | 
			
		||||
 | 
			
		||||
            # deliver until cancelled
 | 
			
		||||
            async for typ, ref in from_cf:
 | 
			
		||||
                match typ:
 | 
			
		||||
                    case 'trade':
 | 
			
		||||
                        trade: cryptofeed.types.Trade = ref
 | 
			
		||||
 | 
			
		||||
                        # TODO, re-impl this according to teh ideal
 | 
			
		||||
                        # fqme for opts that we choose!!
 | 
			
		||||
                        bs_fqme: str = cb_sym_to_deribit_inst(
 | 
			
		||||
                            str_to_cb_sym(trade.symbol)
 | 
			
		||||
                        ).lower()
 | 
			
		||||
 | 
			
		||||
                        piker_quote: dict = {
 | 
			
		||||
                            'symbol': bs_fqme,
 | 
			
		||||
                            'last': trade.price,
 | 
			
		||||
                            'broker_ts': time.time(),
 | 
			
		||||
                            # ^TODO, name this `brokerd/datad_ts` and
 | 
			
		||||
                            # use `time.time_ns()` ??
 | 
			
		||||
                            'ticks': [{
 | 
			
		||||
                                'type': 'trade',
 | 
			
		||||
                                'price': float(trade.price),
 | 
			
		||||
                                'size': float(trade.amount),
 | 
			
		||||
                                'broker_ts': trade.timestamp,
 | 
			
		||||
                            }],
 | 
			
		||||
                        }
 | 
			
		||||
                        log.info(
 | 
			
		||||
                            f'deribit {typ!r} quote for {sym!r}\n\n'
 | 
			
		||||
                            f'{trade}\n\n'
 | 
			
		||||
                            f'{pfmt(piker_quote)}\n'
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                    case 'l1':
 | 
			
		||||
                        book: cryptofeed.types.L1Book = ref
 | 
			
		||||
 | 
			
		||||
                        # TODO, so this is where we can possibly change things
 | 
			
		||||
                        # and instead lever the `MktPair.bs_fqme: str` output?
 | 
			
		||||
                        bs_fqme: str = cb_sym_to_deribit_inst(
 | 
			
		||||
                            str_to_cb_sym(book.symbol)
 | 
			
		||||
                        ).lower()
 | 
			
		||||
 | 
			
		||||
                        piker_quote: dict = {
 | 
			
		||||
                            'symbol': bs_fqme,
 | 
			
		||||
                            'ticks': [
 | 
			
		||||
 | 
			
		||||
                                {'type': 'bid',
 | 
			
		||||
                                 'price': float(book.bid_price),
 | 
			
		||||
                                 'size': float(book.bid_size)},
 | 
			
		||||
 | 
			
		||||
                                {'type': 'bsize',
 | 
			
		||||
                                 'price': float(book.bid_price),
 | 
			
		||||
                                 'size': float(book.bid_size),},
 | 
			
		||||
 | 
			
		||||
                                {'type': 'ask',
 | 
			
		||||
                                 'price': float(book.ask_price),
 | 
			
		||||
                                 'size': float(book.ask_size),},
 | 
			
		||||
 | 
			
		||||
                                {'type': 'asize',
 | 
			
		||||
                                 'price': float(book.ask_price),
 | 
			
		||||
                                 'size': float(book.ask_size),}
 | 
			
		||||
                            ]
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
                await send_chan.send({
 | 
			
		||||
                    topic: piker_quote,
 | 
			
		||||
                })
 | 
			
		||||
            async for typ, quote in stream:
 | 
			
		||||
                topic = quote['symbol']
 | 
			
		||||
                await send_chan.send({topic: quote})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			@ -391,13 +259,13 @@ async def open_symbol_search(
 | 
			
		|||
    async with open_cached_client('deribit') as client:
 | 
			
		||||
 | 
			
		||||
        # load all symbols locally for fast search
 | 
			
		||||
        # cache = client._pairs
 | 
			
		||||
        cache = client._pairs
 | 
			
		||||
        await ctx.started()
 | 
			
		||||
 | 
			
		||||
        async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
            pattern: str
 | 
			
		||||
            async for pattern in stream:
 | 
			
		||||
 | 
			
		||||
                # NOTE: pattern fuzzy-matching is done within
 | 
			
		||||
                # the methd impl.
 | 
			
		||||
                pairs: dict[str, Pair] = await client.search_symbols(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,10 +22,11 @@ from __future__ import annotations
 | 
			
		|||
import pendulum
 | 
			
		||||
from typing import (
 | 
			
		||||
    Literal,
 | 
			
		||||
    Optional,
 | 
			
		||||
)
 | 
			
		||||
from decimal import Decimal
 | 
			
		||||
 | 
			
		||||
from msgspec import field
 | 
			
		||||
 | 
			
		||||
from piker.types import Struct
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -110,21 +111,18 @@ class OptionPair(Pair, frozen=True):
 | 
			
		|||
    block_trade_min_trade_amount: int # '25'
 | 
			
		||||
    block_trade_commission: float # '0.003'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # NOTE: see `.data._symcache.SymbologyCache.load()` for why
 | 
			
		||||
    ns_path: str = 'piker.brokers.deribit:OptionPair'
 | 
			
		||||
 | 
			
		||||
    # TODO, impl this without the MM:SS part of
 | 
			
		||||
    # the `'THH:MM:SS..'` etc..
 | 
			
		||||
    @property
 | 
			
		||||
    def expiry(self) -> str:
 | 
			
		||||
        iso_date = pendulum.from_timestamp(
 | 
			
		||||
            self.expiration_timestamp / 1000
 | 
			
		||||
        ).isoformat()
 | 
			
		||||
        iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
 | 
			
		||||
        return iso_date 
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def venue(self) -> str:
 | 
			
		||||
        return f'{self.instrument_type}_option'
 | 
			
		||||
        return 'option'
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def bs_fqme(self) -> str:
 | 
			
		||||
| 
						 | 
				
			
			@ -154,7 +152,6 @@ class JSONRPCResult(Struct):
 | 
			
		|||
    error: Optional[dict] = None
 | 
			
		||||
    result: Optional[list[dict]] = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class JSONRPCChannel(Struct):
 | 
			
		||||
    method: str
 | 
			
		||||
    params: dict
 | 
			
		||||
| 
						 | 
				
			
			@ -171,7 +168,6 @@ class KLinesResult(Struct):
 | 
			
		|||
    status: str
 | 
			
		||||
    volume: list[float]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Trade(Struct):
 | 
			
		||||
    iv: float
 | 
			
		||||
    price: float
 | 
			
		||||
| 
						 | 
				
			
			@ -190,7 +186,6 @@ class Trade(Struct):
 | 
			
		|||
    block_trade_id: Optional[str] = '',
 | 
			
		||||
    block_trade_leg_count: Optional[int] = 0,
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LastTradesResult(Struct):
 | 
			
		||||
    trades: list[Trade]
 | 
			
		||||
    has_more: bool
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -434,32 +434,21 @@ async def start_backfill(
 | 
			
		|||
                # - some other unknown error (ib blocking the
 | 
			
		||||
                #   history bc they don't want you seeing how they
 | 
			
		||||
                #   cucked all the tinas..)
 | 
			
		||||
                if (
 | 
			
		||||
                    frame_types
 | 
			
		||||
                    and
 | 
			
		||||
                    (dur := frame_types.get(timeframe))
 | 
			
		||||
                ):
 | 
			
		||||
                    # decrement by a duration's (frame) worth of time
 | 
			
		||||
                    # as maybe indicated by the backend to see if we
 | 
			
		||||
                    # can get older data before this possible
 | 
			
		||||
                    # "history gap".
 | 
			
		||||
                    orig_last_start_dt = last_start_dt
 | 
			
		||||
                    last_start_dt = last_start_dt.subtract(
 | 
			
		||||
                if dur := frame_types.get(timeframe):
 | 
			
		||||
                    # decrement by a frame's worth of duration and
 | 
			
		||||
                    # retry a few times.
 | 
			
		||||
                    last_start_dt.subtract(
 | 
			
		||||
                        seconds=dur.total_seconds()
 | 
			
		||||
                    )
 | 
			
		||||
                    log.warning(
 | 
			
		||||
                        f'{mod.name} -> EMPTY FRAME for end_dt?\n'
 | 
			
		||||
                        f'tf@fqme: {timeframe}@{mkt.fqme}\n'
 | 
			
		||||
                        f'Decrementing `end_dt` by {dur} and retry..\n\n'
 | 
			
		||||
 | 
			
		||||
                        f'orig_last_start_dt: {orig_last_start_dt}\n'
 | 
			
		||||
                        f'dur subtracted last_start_dt: {last_start_dt}\n'
 | 
			
		||||
                        f'bf_until: {backfill_until_dt}\n'
 | 
			
		||||
                        'bf_until <- last_start_dt:\n'
 | 
			
		||||
                        f'{backfill_until_dt} <- {last_start_dt}\n'
 | 
			
		||||
                        f'Decrementing `end_dt` by {dur} and retry..\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            # broker says there never was or is no more history to pull
 | 
			
		||||
            except DataUnavailable:
 | 
			
		||||
                log.warning(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue