diff --git a/config/brokers.toml b/config/brokers.toml index 18d67da5..bb57c78d 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -50,3 +50,8 @@ prefer_data_account = [ paper = "XX0000000" margin = "X0000000" ira = "X0000000" + + +[deribit] +key_id = 'XXXXXXXX' +key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx' diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index b614a4fd..0d84384d 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -39,6 +39,148 @@ _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') +OK = '\033[92m' +WARNING = '\033[93m' +FAIL = '\033[91m' +ENDC = '\033[0m' + + +def print_ok(s: str, **kwargs): + print(OK + s + ENDC, **kwargs) + + +def print_error(s: str, **kwargs): + print(FAIL + s + ENDC, **kwargs) + + +def get_method(client, meth_name: str): + print(f'checking client for method \'{meth_name}\'...', end='', flush=True) + method = getattr(client, meth_name, None) + assert method + print_ok('found!.') + return method + +async def run_method(client, meth_name: str, **kwargs): + method = get_method(client, meth_name) + print('running...', end='', flush=True) + result = await method(**kwargs) + print_ok(f'done! result: {type(result)}') + return result + +async def run_test(broker_name: str): + brokermod = get_brokermod(broker_name) + total = 0 + passed = 0 + failed = 0 + + print(f'getting client...', end='', flush=True) + if not hasattr(brokermod, 'get_client'): + print_error('fail! no \'get_client\' context manager found.') + return + + async with brokermod.get_client(is_brokercheck=True) as client: + print_ok(f'done! inside client context.') + + # check for methods present on brokermod + method_list = [ + 'backfill_bars', + 'get_client', + 'trades_dialogue', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', + + ] + + for method in method_list: + print( + f'checking brokermod for method \'{method}\'...', + end='', flush=True) + if not hasattr(brokermod, method): + print_error(f'fail! method \'{method}\' not found.') + failed += 1 + else: + print_ok('done!') + passed += 1 + + total += 1 + + # check for methods present con brokermod.Client and their + # results + + # for private methods only check is present + method_list = [ + 'get_balances', + 'get_assets', + 'get_trades', + 'get_xfers', + 'submit_limit', + 'submit_cancel', + 'search_symbols', + ] + + for method_name in method_list: + try: + get_method(client, method_name) + passed += 1 + + except AssertionError: + print_error(f'fail! method \'{method_name}\' not found.') + failed += 1 + + total += 1 + + + # check for methods present con brokermod.Client and their + # results + + syms = await run_method(client, 'symbol_info') + total += 1 + + if len(syms) == 0: + raise BaseException('Empty Symbol list?') + + passed += 1 + + first_sym = tuple(syms.keys())[0] + + method_list = [ + ('cache_symbols', {}), + ('search_symbols', {'pattern': first_sym[:-1]}), + ('bars', {'symbol': first_sym}) + ] + + for method_name, method_kwargs in method_list: + try: + await run_method(client, method_name, **method_kwargs) + passed += 1 + + except AssertionError: + print_error(f'fail! method \'{method_name}\' not found.') + failed += 1 + + total += 1 + + print(f'total: {total}, passed: {passed}, failed: {failed}') + + +@cli.command() +@click.argument('broker', nargs=1, required=True) +@click.pass_obj +def brokercheck(config, broker): + ''' + Test broker apis for completeness. + + ''' + async def bcheck_main(): + async with maybe_spawn_brokerd(broker) as portal: + await portal.run(run_test, broker) + await portal.cancel_actor() + + trio.run(run_test, broker) + + + @cli.command() @click.option('--keys', '-k', multiple=True, help='Return results only for these keys') @@ -193,6 +335,8 @@ def contracts(ctx, loglevel, broker, symbol, ids): brokermod = get_brokermod(broker) get_console_log(loglevel) + + contracts = trio.run(partial(core.contracts, brokermod, symbol)) if not ids: # just print out expiry dates which can be used with diff --git a/piker/brokers/deribit/README.rst b/piker/brokers/deribit/README.rst new file mode 100644 index 00000000..61cb0d4d --- /dev/null +++ b/piker/brokers/deribit/README.rst @@ -0,0 +1,70 @@ +``deribit`` backend +------------------ +pretty good liquidity crypto derivatives, uses custom json rpc over ws for +client methods, then `cryptofeed` for data streams. + +status +****** +- supports option charts +- no order support yet + + +config +****** +In order to get order mode support your ``brokers.toml`` +needs to have something like the following: + +.. code:: toml + + [deribit] + key_id = 'XXXXXXXX' + key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx' + +To obtain an api id and secret you need to create an account, which can be a +real market account over at: + + - deribit.com (requires KYC for deposit address) + +Or a testnet account over at: + + - test.deribit.com + +For testnet once the account is created here is how you deposit fake crypto to +try it out: + +1) Go to Wallet: + +.. figure:: assets/0_wallet.png + :align: center + :target: assets/0_wallet.png + :alt: wallet page + +2) Then click on the elipsis menu and select deposit + +.. figure:: assets/1_wallet_select_deposit.png + :align: center + :target: assets/1_wallet_select_deposit.png + :alt: wallet deposit page + +3) This will take you to the deposit address page + +.. figure:: assets/2_gen_deposit_addr.png + :align: center + :target: assets/2_gen_deposit_addr.png + :alt: generate deposit address page + +4) After clicking generate you should see the address, copy it and go to the +`coin faucet `_ and send fake +coins to that address. + +.. figure:: assets/3_deposit_address.png + :align: center + :target: assets/3_deposit_address.png + :alt: generated address + +5) Back in the deposit address page you should see the deposit in your history + +.. figure:: assets/4_wallet_deposit_history.png + :align: center + :target: assets/4_wallet_deposit_history.png + :alt: wallet deposit history diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py new file mode 100644 index 00000000..f5c48b58 --- /dev/null +++ b/piker/brokers/deribit/__init__.py @@ -0,0 +1,65 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Deribit backend. + +''' + +from piker.log import get_logger + +log = get_logger(__name__) + +from .api import ( + get_client, +) +from .feed import ( + open_history_client, + open_symbol_search, + stream_quotes, + backfill_bars +) +# from .broker import ( + # trades_dialogue, + # norm_trade_records, +# ) + +__all__ = [ + 'get_client', +# 'trades_dialogue', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', +# 'norm_trade_records', +] + + +# tractor RPC enable arg +__enable_modules__: list[str] = [ + 'api', + 'feed', +# 'broker', +] + +# passed to ``tractor.ActorNursery.start_actor()`` +_spawn_kwargs = { + 'infect_asyncio': True, +} + +# annotation to let backend agnostic code +# know if ``brokerd`` should be spawned with +# ``tractor``'s aio mode. +_infect_asyncio: bool = True diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py new file mode 100644 index 00000000..a0361fa9 --- /dev/null +++ b/piker/brokers/deribit/api.py @@ -0,0 +1,716 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Deribit backend. + +''' +import json +import time +import asyncio + +from contextlib import asynccontextmanager as acm, AsyncExitStack +from itertools import count +from functools import partial +from datetime import datetime +from typing import Any, Optional, Iterable, Callable + +import pendulum +import asks +import trio +from trio_typing import Nursery, TaskStatus +from fuzzywuzzy import process as fuzzy +import numpy as np + +from piker.data.types import Struct +from piker.data._web_bs import NoBsWs, open_autorecon_ws + +from .._util import resproc + +from piker import config +from piker.log import get_logger + +from tractor.trionics import ( + broadcast_receiver, + BroadcastReceiver, + maybe_open_context +) +from tractor import to_asyncio + +from cryptofeed import FeedHandler + +from cryptofeed.defines import ( + DERIBIT, + L1_BOOK, TRADES, + OPTION, CALL, PUT +) +from cryptofeed.symbols import Symbol + +log = get_logger(__name__) + + +_spawn_kwargs = { + 'infect_asyncio': True, +} + + +_url = 'https://www.deribit.com' +_ws_url = 'wss://www.deribit.com/ws/api/v2' +_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' + + +# Broker specific ohlc schema (rest) +_ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('bar_wap', float), # will be zeroed by sampler if not filled +] + + +class JSONRPCResult(Struct): + jsonrpc: str = '2.0' + id: int + result: Optional[dict] = None + error: Optional[dict] = None + usIn: int + usOut: int + usDiff: int + testnet: bool + + +class KLinesResult(Struct): + close: list[float] + cost: list[float] + high: list[float] + low: list[float] + open: list[float] + status: str + ticks: list[int] + volume: list[float] + +class Trade(Struct): + trade_seq: int + trade_id: str + timestamp: int + tick_direction: int + price: float + mark_price: float + iv: float + instrument_name: str + index_price: float + direction: str + combo_trade_id: Optional[int] = 0, + combo_id: Optional[str] = '', + amount: float + +class LastTradesResult(Struct): + trades: list[Trade] + has_more: bool + + +# convert datetime obj timestamp to unixtime in milliseconds +def deribit_timestamp(when): + return int((when.timestamp() * 1000) + (when.microsecond / 1000)) + + +def str_to_cb_sym(name: str) -> Symbol: + base, strike_price, expiry_date, option_type = name.split('-') + + quote = base + + if option_type == 'put': + option_type = PUT + elif option_type == 'call': + option_type = CALL + else: + raise Exception("Couldn\'t parse option type") + + return Symbol( + base, quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date, + expiry_normalize=False) + + +def piker_sym_to_cb_sym(name: str) -> Symbol: + base, expiry_date, strike_price, option_type = tuple( + name.upper().split('-')) + + quote = base + + if option_type == 'P': + option_type = PUT + elif option_type == 'C': + option_type = CALL + else: + raise Exception("Couldn\'t parse option type") + + return Symbol( + base, quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date.upper()) + + +def cb_sym_to_deribit_inst(sym: Symbol): + # cryptofeed normalized + cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] + + # deribit specific + months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] + + exp = sym.expiry_date + + # YYMDD + # 01234 + year, month, day = ( + exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) + + otype = 'C' if sym.option_type == CALL else 'P' + + return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' + + +def get_config() -> dict[str, Any]: + + conf, path = config.load() + + section = conf.get('deribit') + + # TODO: document why we send this, basically because logging params for cryptofeed + conf['log'] = {} + conf['log']['disabled'] = True + + if section is None: + log.warning(f'No config section found for deribit in {path}') + + return conf + + +class Client: + + def __init__(self, json_rpc: Callable) -> None: + self._pairs: dict[str, Any] = None + + config = get_config().get('deribit', {}) + + if ('key_id' in config) and ('key_secret' in config): + self._key_id = config['key_id'] + self._key_secret = config['key_secret'] + + else: + self._key_id = None + self._key_secret = None + + self.json_rpc = json_rpc + + @property + def currencies(self): + return ['btc', 'eth', 'sol', 'usd'] + + async def get_balances(self, kind: str = 'option') -> dict[str, float]: + """Return the set of positions for this account + by symbol. + """ + balances = {} + + for currency in self.currencies: + resp = await self.json_rpc( + 'private/get_positions', params={ + 'currency': currency.upper(), + 'kind': kind}) + + balances[currency] = resp.result + + return balances + + async def get_assets(self) -> dict[str, float]: + """Return the set of asset balances for this account + by symbol. + """ + balances = {} + + for currency in self.currencies: + resp = await self.json_rpc( + 'private/get_account_summary', params={ + 'currency': currency.upper()}) + + balances[currency] = resp.result['balance'] + + return balances + + async def submit_limit( + self, + symbol: str, + price: float, + action: str, + size: float + ) -> dict: + """Place an order + """ + params = { + 'instrument_name': symbol.upper(), + 'amount': size, + 'type': 'limit', + 'price': price, + } + resp = await self.json_rpc( + f'private/{action}', params) + + return resp.result + + async def submit_cancel(self, oid: str): + """Send cancel request for order id + """ + resp = await self.json_rpc( + 'private/cancel', {'order_id': oid}) + return resp.result + + async def symbol_info( + self, + instrument: Optional[str] = None, + currency: str = 'btc', # BTC, ETH, SOL, USDC + kind: str = 'option', + expired: bool = False + ) -> dict[str, Any]: + """Get symbol info for the exchange. + + """ + if self._pairs: + return self._pairs + + # will retrieve all symbols by default + params = { + 'currency': currency.upper(), + 'kind': kind, + 'expired': str(expired).lower() + } + + resp = await self.json_rpc('public/get_instruments', params) + results = resp.result + + instruments = { + item['instrument_name'].lower(): item + for item in results + } + + if instrument is not None: + return instruments[instrument] + else: + return instruments + + async def cache_symbols( + self, + ) -> dict: + if not self._pairs: + self._pairs = await self.symbol_info() + + return self._pairs + + async def search_symbols( + self, + pattern: str, + limit: int = 30, + ) -> dict[str, Any]: + data = await self.symbol_info() + + matches = fuzzy.extractBests( + pattern, + data, + score_cutoff=35, + limit=limit + ) + # repack in dict form + return {item[0]['instrument_name'].lower(): item[0] + for item in matches} + + async def bars( + self, + symbol: str, + start_dt: Optional[datetime] = None, + end_dt: Optional[datetime] = None, + limit: int = 1000, + as_np: bool = True, + ) -> dict: + instrument = symbol + + if end_dt is None: + end_dt = pendulum.now('UTC') + + if start_dt is None: + start_dt = end_dt.start_of( + 'minute').subtract(minutes=limit) + + start_time = deribit_timestamp(start_dt) + end_time = deribit_timestamp(end_dt) + + # https://docs.deribit.com/#public-get_tradingview_chart_data + resp = await self.json_rpc( + 'public/get_tradingview_chart_data', + params={ + 'instrument_name': instrument.upper(), + 'start_timestamp': start_time, + 'end_timestamp': end_time, + 'resolution': '1' + }) + + result = KLinesResult(**resp.result) + new_bars = [] + for i in range(len(result.close)): + + _open = result.open[i] + high = result.high[i] + low = result.low[i] + close = result.close[i] + volume = result.volume[i] + + row = [ + (start_time + (i * (60 * 1000))) / 1000.0, # time + result.open[i], + result.high[i], + result.low[i], + result.close[i], + result.volume[i], + 0 + ] + + new_bars.append((i,) + tuple(row)) + + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines + return array + + async def last_trades( + self, + instrument: str, + count: int = 10 + ): + resp = await self.json_rpc( + 'public/get_last_trades_by_instrument', + params={ + 'instrument_name': instrument, + 'count': count + }) + + return LastTradesResult(**resp.result) + + +@acm +async def get_client( + is_brokercheck: bool = False +) -> Client: + + async with ( + trio.open_nursery() as n, + open_autorecon_ws(_testnet_ws_url) as ws + ): + + _rpc_id: Iterable = count(0) + _rpc_results: dict[int, dict] = {} + + _expiry_time: int = float('inf') + _access_token: Optional[str] = None + _refresh_token: Optional[str] = None + + async def json_rpc(method: str, params: dict) -> dict: + """perform a json rpc call and wait for the result, raise exception in + case of error field present on response + """ + msg = { + 'jsonrpc': '2.0', + 'id': next(_rpc_id), + 'method': method, + 'params': params + } + _id = msg['id'] + + _rpc_results[_id] = { + 'result': None, + 'event': trio.Event() + } + + await ws.send_msg(msg) + + await _rpc_results[_id]['event'].wait() + + ret = _rpc_results[_id]['result'] + + del _rpc_results[_id] + + if ret.error is not None: + raise Exception(json.dumps(ret.error, indent=4)) + + return ret + + async def _recv_task(): + """receives every ws message and stores it in its corresponding result + field, then sets the event to wakeup original sender tasks. + """ + async for msg in ws: + msg = JSONRPCResult(**msg) + + if msg.id not in _rpc_results: + # in case this message wasn't beign accounted for store it + _rpc_results[msg.id] = { + 'result': None, + 'event': trio.Event() + } + + _rpc_results[msg.id]['result'] = msg + _rpc_results[msg.id]['event'].set() + + client = Client(json_rpc) + + async def _auth_loop( + task_status: TaskStatus = trio.TASK_STATUS_IGNORED + ): + """Background task that adquires a first access token and then will + refresh the access token while the nursery isn't cancelled. + + https://docs.deribit.com/?python#authentication-2 + """ + renew_time = 10 + access_scope = 'trade:read_write' + _expiry_time = time.time() + got_access = False + nonlocal _refresh_token + nonlocal _access_token + + while True: + if time.time() - _expiry_time < renew_time: + # if we are close to token expiry time + + if _refresh_token != None: + # if we have a refresh token already dont need to send + # secret + params = { + 'grant_type': 'refresh_token', + 'refresh_token': _refresh_token, + 'scope': access_scope + } + + else: + # we don't have refresh token, send secret to initialize + params = { + 'grant_type': 'client_credentials', + 'client_id': client._key_id, + 'client_secret': client._key_secret, + 'scope': access_scope + } + + resp = await json_rpc('public/auth', params) + result = resp.result + + _expiry_time = time.time() + result['expires_in'] + _refresh_token = result['refresh_token'] + + if 'access_token' in result: + _access_token = result['access_token'] + + if not got_access: + # first time this loop runs we must indicate task is + # started, we have auth + got_access = True + task_status.started() + + else: + await trio.sleep(renew_time / 2) + + n.start_soon(_recv_task) + # if we have client creds launch auth loop + if client._key_id is not None: + await n.start(_auth_loop) + + await client.cache_symbols() + yield client + n.cancel_scope.cancel() + + +@acm +async def open_feed_handler(): + fh = FeedHandler(config=get_config()) + yield fh + await to_asyncio.run_task(fh.stop_async) + + +@acm +async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: + async with maybe_open_context( + acm_func=open_feed_handler, + key='feedhandler', + ) as (cache_hit, fh): + yield fh + + +async def aio_price_feed_relay( + fh: FeedHandler, + instrument: Symbol, + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _trade(data: dict, receipt_timestamp): + to_trio.send_nowait(('trade', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'last': data, + 'broker_ts': time.time(), + 'data': data.to_dict(), + 'receipt': receipt_timestamp + })) + + async def _l1(data: dict, receipt_timestamp): + to_trio.send_nowait(('l1', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'ticks': [ + {'type': 'bid', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'bsize', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'ask', + 'price': float(data.ask_price), 'size': float(data.ask_size)}, + {'type': 'asize', + 'price': float(data.ask_price), 'size': float(data.ask_size)} + ] + })) + + fh.add_feed( + DERIBIT, + channels=[TRADES, L1_BOOK], + symbols=[piker_sym_to_cb_sym(instrument)], + callbacks={ + TRADES: _trade, + L1_BOOK: _l1 + }) + + if not fh.running: + fh.run( + start_loop=False, + install_signal_handlers=False) + + # sync with trio + to_trio.send_nowait(None) + + await asyncio.sleep(float('inf')) + + +@acm +async def open_price_feed( + instrument: str +) -> trio.abc.ReceiveStream: + async with maybe_open_feed_handler() as fh: + async with to_asyncio.open_channel_from( + partial( + aio_price_feed_relay, + fh, + instrument + ) + ) as (first, chan): + yield chan + + +@acm +async def maybe_open_price_feed( + instrument: str +) -> trio.abc.ReceiveStream: + + # TODO: add a predicate to maybe_open_context + async with maybe_open_context( + acm_func=open_price_feed, + kwargs={ + 'instrument': instrument + }, + key=f'{instrument}-price', + ) as (cache_hit, feed): + if cache_hit: + yield broadcast_receiver(feed, 10) + else: + yield feed + + + +async def aio_order_feed_relay( + fh: FeedHandler, + instrument: Symbol, + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _fill(data: dict, receipt_timestamp): + breakpoint() + + async def _order_info(data: dict, receipt_timestamp): + breakpoint() + + fh.add_feed( + DERIBIT, + channels=[FILLS, ORDER_INFO], + symbols=[instrument.upper()], + callbacks={ + FILLS: _fill, + ORDER_INFO: _order_info, + }) + + if not fh.running: + fh.run( + start_loop=False, + install_signal_handlers=False) + + # sync with trio + to_trio.send_nowait(None) + + await asyncio.sleep(float('inf')) + + +@acm +async def open_order_feed( + instrument: list[str] +) -> trio.abc.ReceiveStream: + async with maybe_open_feed_handler() as fh: + async with to_asyncio.open_channel_from( + partial( + aio_order_feed_relay, + fh, + instrument + ) + ) as (first, chan): + yield chan + + +@acm +async def maybe_open_order_feed( + instrument: str +) -> trio.abc.ReceiveStream: + + # TODO: add a predicate to maybe_open_context + async with maybe_open_context( + acm_func=open_order_feed, + kwargs={ + 'instrument': instrument, + 'fh': fh + }, + key=f'{instrument}-order', + ) as (cache_hit, feed): + if cache_hit: + yield broadcast_receiver(feed, 10) + else: + yield feed diff --git a/piker/brokers/deribit/assets/0_wallet.png b/piker/brokers/deribit/assets/0_wallet.png new file mode 100644 index 00000000..5f5880ee Binary files /dev/null and b/piker/brokers/deribit/assets/0_wallet.png differ diff --git a/piker/brokers/deribit/assets/1_wallet_select_deposit.png b/piker/brokers/deribit/assets/1_wallet_select_deposit.png new file mode 100644 index 00000000..7527f635 Binary files /dev/null and b/piker/brokers/deribit/assets/1_wallet_select_deposit.png differ diff --git a/piker/brokers/deribit/assets/2_gen_deposit_addr.png b/piker/brokers/deribit/assets/2_gen_deposit_addr.png new file mode 100644 index 00000000..2006a710 Binary files /dev/null and b/piker/brokers/deribit/assets/2_gen_deposit_addr.png differ diff --git a/piker/brokers/deribit/assets/3_deposit_address.png b/piker/brokers/deribit/assets/3_deposit_address.png new file mode 100644 index 00000000..9db6f163 Binary files /dev/null and b/piker/brokers/deribit/assets/3_deposit_address.png differ diff --git a/piker/brokers/deribit/assets/4_wallet_deposit_history.png b/piker/brokers/deribit/assets/4_wallet_deposit_history.png new file mode 100644 index 00000000..997922a6 Binary files /dev/null and b/piker/brokers/deribit/assets/4_wallet_deposit_history.png differ diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py new file mode 100644 index 00000000..714ef61b --- /dev/null +++ b/piker/brokers/deribit/feed.py @@ -0,0 +1,200 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Deribit backend. + +''' +from contextlib import asynccontextmanager as acm +from datetime import datetime +from typing import Any, Optional, Callable +import time + +import trio +from trio_typing import TaskStatus +import pendulum +from fuzzywuzzy import process as fuzzy +import numpy as np +import tractor + +from piker._cacheables import open_cached_client +from piker.log import get_logger, get_console_log +from piker.data import ShmArray +from piker.brokers._util import ( + BrokerError, + DataUnavailable, +) + +from cryptofeed import FeedHandler + +from cryptofeed.defines import ( + DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT +) +from cryptofeed.symbols import Symbol + +from .api import ( + Client, Trade, + get_config, + str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + maybe_open_price_feed +) + +_spawn_kwargs = { + 'infect_asyncio': True, +} + + +log = get_logger(__name__) + + +@acm +async def open_history_client( + instrument: str, +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('deribit') as client: + + async def get_ohlc( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + + array = await client.bars( + instrument, + start_dt=start_dt, + end_dt=end_dt, + ) + if len(array) == 0: + raise DataUnavailable + + start_dt = pendulum.from_timestamp(array[0]['time']) + end_dt = pendulum.from_timestamp(array[-1]['time']) + + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 3, 'rate': 3} + + +async def backfill_bars( + symbol: str, + shm: ShmArray, # type: ignore # noqa + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + """Fill historical bars into shared mem / storage afap. + """ + instrument = symbol + with trio.CancelScope() as cs: + async with open_cached_client('deribit') as client: + bars = await client.bars(instrument) + shm.push(bars) + task_status.started(cs) + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + sym = symbols[0] + + async with ( + open_cached_client('deribit') as client, + send_chan as send_chan + ): + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': { + 'asset_type': 'option', + 'price_tick_size': 0.0005 + }, + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, + }, + } + + nsym = piker_sym_to_cb_sym(sym) + + async with maybe_open_price_feed(sym) as stream: + + cache = await client.cache_symbols() + + last_trades = (await client.last_trades( + cb_sym_to_deribit_inst(nsym), count=1)).trades + + if len(last_trades) == 0: + last_trade = None + async for typ, quote in stream: + if typ == 'trade': + last_trade = Trade(**(quote['data'])) + break + + else: + last_trade = Trade(**(last_trades[0])) + + first_quote = { + 'symbol': sym, + 'last': last_trade.price, + 'brokerd_ts': last_trade.timestamp, + 'ticks': [{ + 'type': 'trade', + 'price': last_trade.price, + 'size': last_trade.amount, + 'broker_ts': last_trade.timestamp + }] + } + task_status.started((init_msgs, first_quote)) + + feed_is_live.set() + + async for typ, quote in stream: + topic = quote['symbol'] + await send_chan.send({topic: quote}) + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +) -> Client: + async with open_cached_client('deribit') as client: + + # load all symbols locally for fast search + cache = await client.cache_symbols() + await ctx.started() + + async with ctx.open_stream() as stream: + + async for pattern in stream: + # repack in dict form + await stream.send( + await client.search_symbols(pattern)) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 9c3fa796..78e82dfd 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. """ from contextlib import asynccontextmanager, AsyncExitStack from types import ModuleType -from typing import Any, Callable, AsyncGenerator +from typing import Any, Optional, Callable, AsyncGenerator import json import trio @@ -54,8 +54,8 @@ class NoBsWs: self, url: str, stack: AsyncExitStack, - fixture: Callable, - serializer: ModuleType = json, + fixture: Optional[Callable] = None, + serializer: ModuleType = json ): self.url = url self.fixture = fixture @@ -80,12 +80,14 @@ class NoBsWs: self._ws = await self._stack.enter_async_context( trio_websocket.open_websocket_url(self.url) ) - # rerun user code fixture - ret = await self._stack.enter_async_context( - self.fixture(self) - ) - assert ret is None + if self.fixture is not None: + # rerun user code fixture + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + + assert ret is None log.info(f'Connection success: {self.url}') return self._ws @@ -121,13 +123,19 @@ class NoBsWs: except self.recon_errors: await self._connect() + def __aiter__(self): + return self + + async def __anext__(self): + return await self.recv_msg() + @asynccontextmanager async def open_autorecon_ws( url: str, # TODO: proper type annot smh - fixture: Callable, + fixture: Optional[Callable] = None, ) -> AsyncGenerator[tuple[...], NoBsWs]: """Apparently we can QoS for all sorts of reasons..so catch em. diff --git a/requirements.txt b/requirements.txt index 93d2aaa2..8f35a63f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ # we require a pinned dev branch to get some edge features that # are often untested in tractor's CI and/or being tested by us # first before committing as core features in tractor's base. --e git+https://github.com/goodboy/tractor.git@master#egg=tractor +-e git+https://github.com/goodboy/tractor.git@reentrant_moc#egg=tractor # `pyqtgraph` peeps keep breaking, fixing, improving so might as well # pin this to a dev branch that we have more control over especially @@ -18,3 +18,6 @@ # ``asyncvnc`` for sending interactions to ib-gw inside docker -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc + +# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes +-e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed diff --git a/setup.py b/setup.py index 71cc078b..44d360fa 100755 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ setup( # 'trimeter', # not released yet.. # 'tractor', # asyncvnc, + # 'cryptofeed', # brokers 'asks==2.4.8',