Compare commits
	
		
			8 Commits 
		
	
	
		
			gitea_feat
			...
			deribit
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 5c02dc6cd7 | |
|  | a5481e6746 | |
|  | 73fcd72256 | |
|  | fc2ceb5964 | |
|  | d0dbb44092 | |
|  | b20500c0d9 | |
|  | 5872095b09 | |
|  | 5f60923ac1 | 
|  | @ -50,3 +50,8 @@ prefer_data_account = [ | |||
| paper = "XX0000000" | ||||
| margin = "X0000000" | ||||
| ira = "X0000000" | ||||
| 
 | ||||
| 
 | ||||
| [deribit] | ||||
| key_id = 'XXXXXXXX' | ||||
| key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx' | ||||
|  |  | |||
|  | @ -0,0 +1,70 @@ | |||
| ``deribit`` backend | ||||
| ------------------ | ||||
| pretty good liquidity crypto derivatives, uses custom json rpc over ws for | ||||
| client methods, then `cryptofeed` for data streams. | ||||
| 
 | ||||
| status | ||||
| ****** | ||||
| - supports option charts | ||||
| - no order support yet  | ||||
| 
 | ||||
| 
 | ||||
| config | ||||
| ****** | ||||
| In order to get order mode support your ``brokers.toml`` | ||||
| needs to have something like the following: | ||||
| 
 | ||||
| .. code:: toml | ||||
| 
 | ||||
|     [deribit] | ||||
|     key_id = 'XXXXXXXX' | ||||
|     key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx' | ||||
| 
 | ||||
| To obtain an api id and secret you need to create an account, which can be a | ||||
| real market account over at: | ||||
| 
 | ||||
|     - deribit.com  (requires KYC for deposit address) | ||||
| 
 | ||||
| Or a testnet account over at: | ||||
| 
 | ||||
|     - test.deribit.com | ||||
| 
 | ||||
| For testnet once the account is created here is how you deposit fake crypto to | ||||
| try it out: | ||||
| 
 | ||||
| 1) Go to Wallet: | ||||
| 
 | ||||
| .. figure:: assets/0_wallet.png | ||||
|     :align: center | ||||
|     :target: assets/0_wallet.png | ||||
|     :alt: wallet page | ||||
| 
 | ||||
| 2) Then click on the elipsis menu and select deposit | ||||
| 
 | ||||
| .. figure:: assets/1_wallet_select_deposit.png | ||||
|     :align: center | ||||
|     :target: assets/1_wallet_select_deposit.png | ||||
|     :alt: wallet deposit page | ||||
| 
 | ||||
| 3) This will take you to the deposit address page | ||||
| 
 | ||||
| .. figure:: assets/2_gen_deposit_addr.png | ||||
|     :align: center | ||||
|     :target: assets/2_gen_deposit_addr.png | ||||
|     :alt: generate deposit address page | ||||
| 
 | ||||
| 4) After clicking generate you should see the address, copy it and go to the  | ||||
| [coin faucet](https://test.deribit.com/dericoin/BTC/deposit) and send fake | ||||
| coins to that address. | ||||
| 
 | ||||
| .. figure:: assets/3_deposit_address.png | ||||
|     :align: center | ||||
|     :target: assets/3_deposit_address.png | ||||
|     :alt: generated address | ||||
| 
 | ||||
| 5) Back in the deposit address page you should see the deposit in your history | ||||
| 
 | ||||
| .. figure:: assets/4_wallet_deposit_history.png | ||||
|     :align: center | ||||
|     :target: assets/4_wallet_deposit_history.png | ||||
|     :alt: wallet deposit history | ||||
|  | @ -32,8 +32,8 @@ from .feed import ( | |||
|     stream_quotes, | ||||
| ) | ||||
| # from .broker import ( | ||||
| #     trades_dialogue, | ||||
| #     norm_trade_records, | ||||
|     # trades_dialogue, | ||||
|     # norm_trade_records, | ||||
| # ) | ||||
| 
 | ||||
| __all__ = [ | ||||
|  | @ -50,7 +50,7 @@ __all__ = [ | |||
| __enable_modules__: list[str] = [ | ||||
|     'api', | ||||
|     'feed', | ||||
| #    'broker', | ||||
|     'broker', | ||||
| ] | ||||
| 
 | ||||
| # passed to ``tractor.ActorNursery.start_actor()`` | ||||
|  |  | |||
|  | @ -1,3 +1,6 @@ | |||
| # piker: trading gear for hackers | ||||
| # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
|  | @ -15,29 +18,52 @@ | |||
| Deribit backend. | ||||
| 
 | ||||
| ''' | ||||
| import json | ||||
| import time | ||||
| import asyncio | ||||
| 
 | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from contextlib import asynccontextmanager as acm, AsyncExitStack | ||||
| from itertools import count | ||||
| from functools import partial | ||||
| from datetime import datetime | ||||
| from typing import Any, Optional, List | ||||
| from typing import Any, List, Dict, Optional, Iterable, Callable | ||||
| 
 | ||||
| import pendulum | ||||
| import asks | ||||
| import trio | ||||
| from trio_typing import Nursery, TaskStatus | ||||
| from fuzzywuzzy import process as fuzzy | ||||
| import numpy as np | ||||
| 
 | ||||
| from piker.data.types import Struct | ||||
| from piker.data._web_bs import NoBsWs, open_autorecon_ws | ||||
| 
 | ||||
| from .._util import resproc | ||||
| 
 | ||||
| from piker import config | ||||
| from piker.log import get_logger | ||||
| 
 | ||||
| from tractor.trionics import broadcast_receiver, BroadcastReceiver, maybe_open_context | ||||
| from tractor import to_asyncio | ||||
| 
 | ||||
| from cryptofeed import FeedHandler | ||||
| 
 | ||||
| from cryptofeed.defines import ( | ||||
|     DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| _spawn_kwargs = { | ||||
|     'infect_asyncio': True, | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| _url = 'https://www.deribit.com' | ||||
| _ws_url = 'wss://www.deribit.com/ws/api/v2' | ||||
| _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' | ||||
| 
 | ||||
| 
 | ||||
| # Broker specific ohlc schema (rest) | ||||
|  | @ -55,7 +81,9 @@ _ohlc_dtype = [ | |||
| 
 | ||||
| class JSONRPCResult(Struct): | ||||
|     jsonrpc: str = '2.0' | ||||
|     result: dict  | ||||
|     id: int | ||||
|     result: Optional[dict] = None | ||||
|     error: Optional[dict] = None | ||||
|     usIn: int  | ||||
|     usOut: int  | ||||
|     usDiff: int  | ||||
|  | @ -83,6 +111,8 @@ class Trade(Struct): | |||
|     instrument_name: str | ||||
|     index_price: float | ||||
|     direction: str | ||||
|     combo_trade_id: Optional[int] = 0, | ||||
|     combo_id: Optional[str] = '', | ||||
|     amount: float | ||||
| 
 | ||||
| class LastTradesResult(Struct): | ||||
|  | @ -95,24 +125,161 @@ def deribit_timestamp(when): | |||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) | ||||
| 
 | ||||
| 
 | ||||
| def str_to_cb_sym(name: str) -> Symbol: | ||||
|     base, strike_price, expiry_date, option_type = name.split('-') | ||||
| 
 | ||||
|     quote = base | ||||
| 
 | ||||
|     if option_type == 'put': | ||||
|         option_type = PUT  | ||||
|     elif option_type  == 'call': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base, quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date, | ||||
|         expiry_normalize=False) | ||||
| 
 | ||||
| 
 | ||||
| def piker_sym_to_cb_sym(name: str) -> Symbol: | ||||
|     base, expiry_date, strike_price, option_type = tuple( | ||||
|         name.upper().split('-')) | ||||
| 
 | ||||
|     quote = base | ||||
| 
 | ||||
|     if option_type == 'P': | ||||
|         option_type = PUT  | ||||
|     elif option_type  == 'C': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base, quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date.upper()) | ||||
| 
 | ||||
| 
 | ||||
| def cb_sym_to_deribit_inst(sym: Symbol): | ||||
|     # cryptofeed normalized | ||||
|     cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] | ||||
| 
 | ||||
|     # deribit specific  | ||||
|     months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] | ||||
| 
 | ||||
|     exp = sym.expiry_date | ||||
| 
 | ||||
|     # YYMDD | ||||
|     # 01234 | ||||
|     year, month, day = ( | ||||
|         exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) | ||||
| 
 | ||||
|     otype = 'C' if sym.option_type == CALL else 'P' | ||||
| 
 | ||||
|     return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' | ||||
| 
 | ||||
| 
 | ||||
| def get_config() -> dict[str, Any]: | ||||
| 
 | ||||
|     conf, path = config.load() | ||||
| 
 | ||||
|     section = conf.get('deribit') | ||||
| 
 | ||||
|     # TODO: document why we send this, basically because logging params for cryptofeed | ||||
|     conf['log'] = {} | ||||
|     conf['log']['disabled'] = True | ||||
| 
 | ||||
|     if section is None: | ||||
|         log.warning(f'No config section found for deribit in {path}') | ||||
| 
 | ||||
|     return conf  | ||||
| 
 | ||||
| 
 | ||||
| class Client: | ||||
| 
 | ||||
|     def __init__(self) -> None: | ||||
|         self._sesh = asks.Session(connections=4) | ||||
|         self._sesh.base_location = _url | ||||
|         self._pairs: dict[str, Any] = {} | ||||
|     def __init__(self, json_rpc: Callable) -> None: | ||||
|         self._pairs: dict[str, Any] = None | ||||
| 
 | ||||
|     async def _api( | ||||
|         config = get_config().get('deribit', {}) | ||||
| 
 | ||||
|         if ('key_id' in config) and ('key_secret' in config): | ||||
|             self._key_id = config['key_id'] | ||||
|             self._key_secret = config['key_secret'] | ||||
| 
 | ||||
|         else: | ||||
|             self._key_id = None | ||||
|             self._key_secret = None | ||||
| 
 | ||||
|         self.json_rpc = json_rpc | ||||
| 
 | ||||
|     @property | ||||
|     def currencies(self): | ||||
|         return ['btc', 'eth', 'sol', 'usd'] | ||||
| 
 | ||||
|     async def get_balances(self, kind: str = 'option') -> dict[str, float]: | ||||
|         """Return the set of positions for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|         balances = {} | ||||
| 
 | ||||
|         for currency in self.currencies: | ||||
|             resp = await self.json_rpc( | ||||
|                 'private/get_positions', params={ | ||||
|                     'currency': currency.upper(), | ||||
|                     'kind': kind}) | ||||
| 
 | ||||
|             balances[currency] = resp.result | ||||
| 
 | ||||
|         return balances | ||||
| 
 | ||||
|     async def get_assets(self) -> dict[str, float]: | ||||
|         """Return the set of asset balances for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|         balances = {} | ||||
| 
 | ||||
|         for currency in self.currencies: | ||||
|             resp = await self.json_rpc( | ||||
|                 'private/get_account_summary', params={ | ||||
|                     'currency': currency.upper()}) | ||||
| 
 | ||||
|             balances[currency] = resp.result['balance'] | ||||
| 
 | ||||
|         return balances | ||||
| 
 | ||||
|     async def submit_limit( | ||||
|         self, | ||||
|         method: str, | ||||
|         params: dict, | ||||
|     ) -> dict[str, Any]: | ||||
|         resp = await self._sesh.get( | ||||
|             path=f'/api/v2/public/{method}', | ||||
|             params=params, | ||||
|             timeout=float('inf') | ||||
|         ) | ||||
|         return resproc(resp, log) | ||||
|         symbol: str, | ||||
|         price: float, | ||||
|         action: str, | ||||
|         size: float | ||||
|     ) -> dict: | ||||
|         """Place an order | ||||
|         """ | ||||
|         params = { | ||||
|             'instrument_name': symbol.upper(), | ||||
|             'amount': size, | ||||
|             'type': 'limit', | ||||
|             'price': price, | ||||
|         } | ||||
|         resp = await self.json_rpc( | ||||
|             f'private/{action}', params) | ||||
| 
 | ||||
|         return resp.result | ||||
| 
 | ||||
|     async def submit_cancel(self, oid: str): | ||||
|         """Send cancel request for order id | ||||
|         """ | ||||
|         resp = await self.json_rpc( | ||||
|             'private/cancel', {'order_id': oid}) | ||||
|         return resp.result | ||||
| 
 | ||||
|     async def symbol_info( | ||||
|         self, | ||||
|  | @ -121,11 +288,11 @@ class Client: | |||
|         kind: str = 'option', | ||||
|         expired: bool = False | ||||
|     ) -> dict[str, Any]: | ||||
|         '''Get symbol info for the exchange. | ||||
|         """Get symbol info for the exchange. | ||||
| 
 | ||||
|         ''' | ||||
|         # TODO: we can load from our self._pairs cache | ||||
|         # on repeat calls... | ||||
|         """ | ||||
|         if self._pairs: | ||||
|             return self._pairs | ||||
| 
 | ||||
|         # will retrieve all symbols by default | ||||
|         params = { | ||||
|  | @ -134,13 +301,13 @@ class Client: | |||
|             'expired': str(expired).lower() | ||||
|         } | ||||
| 
 | ||||
|         resp = await self._api( | ||||
|             'get_instruments', params=params) | ||||
| 
 | ||||
|         results = resp['result'] | ||||
|         resp = await self.json_rpc('public/get_instruments', params) | ||||
|         results = resp.result | ||||
| 
 | ||||
|         instruments = { | ||||
|             item['instrument_name']: item for item in results} | ||||
|             item['instrument_name'].lower(): item | ||||
|             for item in results | ||||
|         } | ||||
| 
 | ||||
|         if instrument is not None: | ||||
|             return instruments[instrument] | ||||
|  | @ -158,20 +325,18 @@ class Client: | |||
|     async def search_symbols( | ||||
|         self, | ||||
|         pattern: str, | ||||
|         limit: int = None, | ||||
|         limit: int = 30, | ||||
|     ) -> dict[str, Any]: | ||||
|         if self._pairs is not None: | ||||
|             data = self._pairs | ||||
|         else: | ||||
|             data = await self.symbol_info() | ||||
|         data = await self.symbol_info() | ||||
| 
 | ||||
|         matches = fuzzy.extractBests( | ||||
|             pattern, | ||||
|             data, | ||||
|             score_cutoff=50, | ||||
|             score_cutoff=35, | ||||
|             limit=limit | ||||
|         ) | ||||
|         # repack in dict form | ||||
|         return {item[0]['instrument_name']: item[0] | ||||
|         return {item[0]['instrument_name'].lower(): item[0] | ||||
|                 for item in matches} | ||||
| 
 | ||||
|     async def bars( | ||||
|  | @ -195,19 +360,16 @@ class Client: | |||
|         end_time = deribit_timestamp(end_dt) | ||||
| 
 | ||||
|         # https://docs.deribit.com/#public-get_tradingview_chart_data | ||||
|         response = await self._api( | ||||
|             'get_tradingview_chart_data', | ||||
|         resp = await self.json_rpc( | ||||
|             'public/get_tradingview_chart_data', | ||||
|             params={ | ||||
|                 'instrument_name': instrument.upper(), | ||||
|                 'start_timestamp': start_time, | ||||
|                 'end_timestamp': end_time, | ||||
|                 'resolution': '1' | ||||
|             } | ||||
|         ) | ||||
|             }) | ||||
| 
 | ||||
|         klines = JSONRPCResult(**response) | ||||
|      | ||||
|         result = KLinesResult(**klines.result) | ||||
|         result = KLinesResult(**resp.result) | ||||
|         new_bars = [] | ||||
|         for i in range(len(result.close)): | ||||
| 
 | ||||
|  | @ -237,19 +399,308 @@ class Client: | |||
|         instrument: str, | ||||
|         count: int = 10 | ||||
|     ): | ||||
|         response = await self._api( | ||||
|             'get_last_trades_by_instrument', | ||||
|         resp = await self.json_rpc( | ||||
|             'public/get_last_trades_by_instrument', | ||||
|             params={ | ||||
|                 'instrument_name': instrument, | ||||
|                 'count': count | ||||
|             } | ||||
|         ) | ||||
|             }) | ||||
| 
 | ||||
|         return LastTradesResult(**(JSONRPCResult(**response).result)) | ||||
|         return LastTradesResult(**resp.result) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def get_client() -> Client: | ||||
|     client = Client() | ||||
|     await client.cache_symbols() | ||||
|     yield client | ||||
|     async with ( | ||||
|         trio.open_nursery() as n, | ||||
|         open_autorecon_ws(_testnet_ws_url) as ws | ||||
|     ): | ||||
| 
 | ||||
|         _rpc_id: Iterable = count(0) | ||||
|         _rpc_results: Dict[int, Dict] = {} | ||||
| 
 | ||||
|         _expiry_time: int = float('inf') | ||||
|         _access_token: Optional[str] = None | ||||
|         _refresh_token: Optional[str] = None | ||||
| 
 | ||||
|         def _next_json_body(method: str, params: Dict): | ||||
|             """get the typical json rpc 2.0 msg body and increment the req id | ||||
|             """ | ||||
|             return { | ||||
|                 'jsonrpc': '2.0', | ||||
|                 'id': next(_rpc_id), | ||||
|                 'method': method, | ||||
|                 'params': params | ||||
|             } | ||||
| 
 | ||||
|         async def json_rpc(method: str, params: Dict) -> Dict: | ||||
|             """perform a json rpc call and wait for the result, raise exception in | ||||
|             case of error field present on response | ||||
|             """ | ||||
|             msg = _next_json_body(method, params) | ||||
|             _id = msg['id'] | ||||
| 
 | ||||
|             _rpc_results[_id] = { | ||||
|                 'result': None, | ||||
|                 'event': trio.Event() | ||||
|             } | ||||
| 
 | ||||
|             await ws.send_msg(msg) | ||||
| 
 | ||||
|             await _rpc_results[_id]['event'].wait() | ||||
| 
 | ||||
|             ret = _rpc_results[_id]['result'] | ||||
| 
 | ||||
|             del _rpc_results[_id] | ||||
| 
 | ||||
|             if ret.error is not None: | ||||
|                 raise Exception(json.dumps(ret.error, indent=4)) | ||||
| 
 | ||||
|             return ret | ||||
| 
 | ||||
|         async def _recv_task(): | ||||
|             """receives every ws message and stores it in its corresponding result | ||||
|             field, then sets the event to wakeup original sender tasks. | ||||
|             """ | ||||
|             async for msg in ws: | ||||
|                 msg = JSONRPCResult(**msg) | ||||
| 
 | ||||
|                 if msg.id not in _rpc_results: | ||||
|                     # in case this message wasn't beign accounted for store it  | ||||
|                     _rpc_results[msg.id] = { | ||||
|                         'result': None, | ||||
|                         'event': trio.Event() | ||||
|                     } | ||||
| 
 | ||||
|                 _rpc_results[msg.id]['result'] = msg | ||||
|                 _rpc_results[msg.id]['event'].set() | ||||
| 
 | ||||
|         client = Client(json_rpc) | ||||
| 
 | ||||
|         async def _auth_loop( | ||||
|             task_status: TaskStatus = trio.TASK_STATUS_IGNORED | ||||
|         ): | ||||
|             """Background task that adquires a first access token and then will | ||||
|             refresh the access token while the nursery isn't cancelled. | ||||
| 
 | ||||
|             https://docs.deribit.com/?python#authentication-2 | ||||
|             """ | ||||
|             renew_time = 10 | ||||
|             access_scope = 'trade:read_write' | ||||
|             _expiry_time = time.time() | ||||
|             got_access = False | ||||
|             nonlocal _refresh_token | ||||
|             nonlocal _access_token | ||||
| 
 | ||||
|             while True: | ||||
|                 if time.time() - _expiry_time < renew_time: | ||||
|                     # if we are close to token expiry time | ||||
| 
 | ||||
|                     if _refresh_token != None: | ||||
|                         # if we have a refresh token already dont need to send | ||||
|                         # secret | ||||
|                         params = { | ||||
|                             'grant_type': 'refresh_token', | ||||
|                             'refresh_token': _refresh_token, | ||||
|                             'scope': access_scope | ||||
|                         } | ||||
| 
 | ||||
|                     else: | ||||
|                         # we don't have refresh token, send secret to initialize | ||||
|                         params = { | ||||
|                             'grant_type': 'client_credentials', | ||||
|                             'client_id': client._key_id, | ||||
|                             'client_secret': client._key_secret, | ||||
|                             'scope': access_scope | ||||
|                         } | ||||
| 
 | ||||
|                     resp = await json_rpc('public/auth', params) | ||||
|                     result = resp.result | ||||
| 
 | ||||
|                     _expiry_time = time.time() + result['expires_in'] | ||||
|                     _refresh_token = result['refresh_token'] | ||||
| 
 | ||||
|                     if 'access_token' in result: | ||||
|                         _access_token = result['access_token'] | ||||
| 
 | ||||
|                     if not got_access: | ||||
|                         # first time this loop runs we must indicate task is | ||||
|                         # started, we have auth | ||||
|                         got_access = True | ||||
|                         task_status.started() | ||||
| 
 | ||||
|                 else: | ||||
|                     await trio.sleep(renew_time / 2) | ||||
| 
 | ||||
|         n.start_soon(_recv_task) | ||||
|         # if we have client creds launch auth loop | ||||
|         if client._key_id is not None: | ||||
|             await n.start(_auth_loop) | ||||
| 
 | ||||
|         await client.cache_symbols() | ||||
|         yield client | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_feed_handler(): | ||||
|     fh = FeedHandler(config=get_config()) | ||||
|     yield fh | ||||
|     await to_asyncio.run_task(fh.stop_async) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_feed_handler, | ||||
|         key='feedhandler', | ||||
|     ) as (cache_hit, fh): | ||||
|         yield fh | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_price_feed( | ||||
|     instrument: str | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # XXX: hangs when going into this ctx mngr | ||||
|     async with maybe_open_feed_handler() as fh: | ||||
| 
 | ||||
|         async def relay( | ||||
|             from_trio: asyncio.Queue, | ||||
|             to_trio: trio.abc.SendChannel, | ||||
|         ) -> None: | ||||
|             async def _trade(data: dict, receipt_timestamp): | ||||
|                 to_trio.send_nowait(('trade', { | ||||
|                     'symbol': cb_sym_to_deribit_inst( | ||||
|                         str_to_cb_sym(data.symbol)).lower(), | ||||
|                     'last': data, | ||||
|                     'broker_ts': time.time(), | ||||
|                     'data': data.to_dict(), | ||||
|                     'receipt': receipt_timestamp | ||||
|                 })) | ||||
| 
 | ||||
|             async def _l1(data: dict, receipt_timestamp): | ||||
|                 to_trio.send_nowait(('l1', { | ||||
|                     'symbol': cb_sym_to_deribit_inst( | ||||
|                         str_to_cb_sym(data.symbol)).lower(), | ||||
|                     'ticks': [ | ||||
|                         {'type': 'bid', | ||||
|                             'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                         {'type': 'bsize', | ||||
|                             'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                         {'type': 'ask', | ||||
|                             'price': float(data.ask_price), 'size': float(data.ask_size)}, | ||||
|                         {'type': 'asize', | ||||
|                             'price': float(data.ask_price), 'size': float(data.ask_size)} | ||||
|                     ] | ||||
|                 })) | ||||
| 
 | ||||
|             fh.add_feed( | ||||
|                 DERIBIT, | ||||
|                 channels=[TRADES, L1_BOOK], | ||||
|                 symbols=[instrument], | ||||
|                 callbacks={ | ||||
|                     TRADES: _trade, | ||||
|                     L1_BOOK: _l1 | ||||
|                 }) | ||||
| 
 | ||||
|             if not fh.running: | ||||
|                 fh.run( | ||||
|                     start_loop=False, | ||||
|                     install_signal_handlers=False) | ||||
| 
 | ||||
|             # sync with trio | ||||
|             to_trio.send_nowait(None) | ||||
| 
 | ||||
|             try: | ||||
|                 await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
|             except asyncio.exceptions.CancelledError: | ||||
|                 ... | ||||
| 
 | ||||
|         async with to_asyncio.open_channel_from( | ||||
|             relay | ||||
|         ) as (first, chan): | ||||
|             yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_price_feed( | ||||
|     instrument: str | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_price_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument | ||||
|         }, | ||||
|         key=f'{instrument}-price', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|         else: | ||||
|             yield feed | ||||
| 
 | ||||
| @acm | ||||
| async def open_order_feed( | ||||
|     instrument: List[str] | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     async with maybe_open_feed_handler() as fh: | ||||
| 
 | ||||
|         async def relay( | ||||
|             from_trio: asyncio.Queue, | ||||
|             to_trio: trio.abc.SendChannel, | ||||
|         ) -> None: | ||||
|             async def _fill(data: dict, receipt_timestamp): | ||||
|                 breakpoint() | ||||
| 
 | ||||
|             async def _order_info(data: dict, receipt_timestamp): | ||||
|                 breakpoint() | ||||
| 
 | ||||
|             fh.add_feed( | ||||
|                 DERIBIT, | ||||
|                 channels=[FILLS, ORDER_INFO], | ||||
|                 symbols=[instrument], | ||||
|                 callbacks={ | ||||
|                     FILLS: _fill, | ||||
|                     ORDER_INFO: _order_info, | ||||
|                 }) | ||||
| 
 | ||||
|             if not fh.running: | ||||
|                 fh.run( | ||||
|                     start_loop=False, | ||||
|                     install_signal_handlers=False) | ||||
| 
 | ||||
|             # sync with trio | ||||
|             to_trio.send_nowait(None) | ||||
| 
 | ||||
|             try: | ||||
|                 await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
|             except asyncio.exceptions.CancelledError: | ||||
|                 ... | ||||
| 
 | ||||
|         async with to_asyncio.open_channel_from( | ||||
|             relay | ||||
|         ) as (first, chan): | ||||
|             yield chan | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_order_feed( | ||||
|     instrument: str | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_order_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument | ||||
|         }, | ||||
|         key=f'{instrument}-order', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|         else: | ||||
|             yield feed | ||||
|  |  | |||
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 169 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 106 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 59 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 70 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 132 KiB | 
|  | @ -0,0 +1,44 @@ | |||
| # piker: trading gear for hackers | ||||
| # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| ''' | ||||
| Order api and machinery | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
| from typing import Any, AsyncIterator | ||||
| 
 | ||||
| import tractor | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def trades_dialogue( | ||||
|     ctx: tractor.Context, | ||||
|     loglevel: str = None, | ||||
| 
 | ||||
| ) -> AsyncIterator[dict[str, Any]]: | ||||
| 
 | ||||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||
| 
 | ||||
|     async with open_cached_client('deribit') as client: | ||||
|         if not client._key_id: | ||||
|             raise RuntimeError('Missing Deribit API key in `brokers.toml`!?!?') | ||||
| 
 | ||||
|         acc_name = f'deribit.{client._key_id}' | ||||
| 
 | ||||
|         await client.cache_symbols() | ||||
| 
 | ||||
|         breakpoint() | ||||
|  | @ -1,3 +1,6 @@ | |||
| # piker: trading gear for hackers | ||||
| # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
|  | @ -15,9 +18,6 @@ | |||
| Deribit backend. | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
| import asyncio | ||||
| from async_generator import aclosing | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from datetime import datetime | ||||
| from typing import Any, Optional, List, Callable | ||||
|  | @ -29,9 +29,7 @@ import pendulum | |||
| from fuzzywuzzy import process as fuzzy | ||||
| import numpy as np | ||||
| import tractor | ||||
| from tractor import to_asyncio | ||||
| 
 | ||||
| from piker import config | ||||
| from piker._cacheables import open_cached_client | ||||
| from piker.log import get_logger, get_console_log | ||||
| from piker.data import ShmArray | ||||
|  | @ -47,170 +45,21 @@ from cryptofeed.defines import ( | |||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| 
 | ||||
| from .api import Client, Trade | ||||
| from .api import ( | ||||
|     Client, Trade, | ||||
|     get_config, | ||||
|     str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     maybe_open_price_feed | ||||
| ) | ||||
| 
 | ||||
| _spawn_kwargs = { | ||||
|     'infect_asyncio': True, | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def get_config() -> dict[str, Any]: | ||||
| 
 | ||||
|     conf, path = config.load() | ||||
| 
 | ||||
|     section = conf.get('deribit') | ||||
| 
 | ||||
|     if section is None: | ||||
|         log.warning(f'No config section found for deribit in {path}') | ||||
|         return {} | ||||
| 
 | ||||
|     conf['log'] = {} | ||||
|     conf['log']['disabled'] = True | ||||
| 
 | ||||
| #    conf['log']['filename'] = '/tmp/feedhandler.log' | ||||
| #    conf['log']['level'] = 'WARNING' | ||||
| 
 | ||||
|     return conf  | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| _url = 'https://www.deribit.com' | ||||
| 
 | ||||
| 
 | ||||
| def str_to_cb_sym(name: str) -> Symbol: | ||||
|     base, strike_price, expiry_date, option_type = name.split('-') | ||||
| 
 | ||||
|     quote = base | ||||
| 
 | ||||
|     if option_type == 'put': | ||||
|         option_type = PUT  | ||||
|     elif option_type  == 'call': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise BaseException("Couldn\'t parse option type") | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base, quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date, | ||||
|         expiry_normalize=False) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def piker_sym_to_cb_sym(name: str) -> Symbol: | ||||
|     base, expiry_date, strike_price, option_type = tuple( | ||||
|         name.upper().split('-')) | ||||
| 
 | ||||
|     quote = base | ||||
| 
 | ||||
|     if option_type == 'P': | ||||
|         option_type = PUT  | ||||
|     elif option_type  == 'C': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise BaseException("Couldn\'t parse option type") | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base, quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date.upper()) | ||||
| 
 | ||||
| 
 | ||||
| def cb_sym_to_deribit_inst(sym: Symbol): | ||||
|     # cryptofeed normalized | ||||
|     cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] | ||||
| 
 | ||||
|     # deribit specific  | ||||
|     months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] | ||||
|      | ||||
|     exp = sym.expiry_date | ||||
| 
 | ||||
|     # YYMDD | ||||
|     # 01234 | ||||
|     year, month, day = ( | ||||
|         exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) | ||||
| 
 | ||||
|     otype = 'C' if sym.option_type == CALL else 'P' | ||||
| 
 | ||||
|     return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' | ||||
| 
 | ||||
| 
 | ||||
| # inside here we are in an asyncio context | ||||
| async def open_aio_cryptofeed_relay( | ||||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
|     instruments: List[str] = [] | ||||
| ) -> None: | ||||
| 
 | ||||
|     instruments = [piker_sym_to_cb_sym(i) for i in instruments] | ||||
| 
 | ||||
|     async def trade_cb(data: dict, receipt_timestamp): | ||||
|         to_trio.send_nowait(('trade', { | ||||
|             'symbol': cb_sym_to_deribit_inst( | ||||
|                 str_to_cb_sym(data.symbol)).lower(), | ||||
|             'last': data, | ||||
|             'broker_ts': time.time(), | ||||
|             'data': data.to_dict(), | ||||
|             'receipt': receipt_timestamp | ||||
|         })) | ||||
| 
 | ||||
|     async def l1_book_cb(data: dict, receipt_timestamp): | ||||
|         to_trio.send_nowait(('l1', { | ||||
|             'symbol': cb_sym_to_deribit_inst( | ||||
|                 str_to_cb_sym(data.symbol)).lower(), | ||||
|             'ticks': [ | ||||
|                 {'type': 'bid', | ||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                 {'type': 'bsize', | ||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                 {'type': 'ask', | ||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)}, | ||||
|                 {'type': 'asize', | ||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)} | ||||
|             ] | ||||
|         })) | ||||
| 
 | ||||
|     fh = FeedHandler(config=get_config()) | ||||
|     fh.run(start_loop=False) | ||||
| 
 | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|         channels=[L1_BOOK], | ||||
|         symbols=instruments, | ||||
|         callbacks={L1_BOOK: l1_book_cb}) | ||||
| 
 | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|         channels=[TRADES], | ||||
|         symbols=instruments, | ||||
|         callbacks={TRADES: trade_cb}) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_cryptofeeds( | ||||
| 
 | ||||
|     instruments: List[str] | ||||
| 
 | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     async with to_asyncio.open_channel_from( | ||||
|         open_aio_cryptofeed_relay, | ||||
|         instruments=instruments, | ||||
|     ) as (first, chan): | ||||
|         yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_history_client( | ||||
|     instrument: str, | ||||
|  | @ -278,9 +127,7 @@ async def stream_quotes( | |||
| 
 | ||||
|     async with ( | ||||
|         open_cached_client('deribit') as client, | ||||
|         send_chan as send_chan, | ||||
|         trio.open_nursery() as n, | ||||
|         open_cryptofeeds(symbols) as stream  | ||||
|         send_chan as send_chan | ||||
|     ): | ||||
| 
 | ||||
|         init_msgs = { | ||||
|  | @ -298,26 +145,36 @@ async def stream_quotes( | |||
| 
 | ||||
|         nsym = piker_sym_to_cb_sym(sym) | ||||
| 
 | ||||
|         # keep client cached for real-time section | ||||
|         cache = await client.cache_symbols() | ||||
|         async with maybe_open_price_feed(sym) as stream: | ||||
| 
 | ||||
|         last_trade = Trade(**(await client.last_trades( | ||||
|             cb_sym_to_deribit_inst(nsym), count=1)).trades[0]) | ||||
|             cache = await client.cache_symbols() | ||||
| 
 | ||||
|         first_quote = { | ||||
|             'symbol': sym, | ||||
|             'last': last_trade.price, | ||||
|             'brokerd_ts': last_trade.timestamp, | ||||
|             'ticks': [{ | ||||
|                 'type': 'trade', | ||||
|                 'price': last_trade.price, | ||||
|                 'size': last_trade.amount, | ||||
|                 'broker_ts': last_trade.timestamp | ||||
|             }] | ||||
|         } | ||||
|         task_status.started((init_msgs,  first_quote)) | ||||
|             last_trades = (await client.last_trades( | ||||
|                 cb_sym_to_deribit_inst(nsym), count=1)).trades | ||||
| 
 | ||||
|             if len(last_trades) == 0: | ||||
|                 last_trade = None | ||||
|                 async for typ, quote in stream: | ||||
|                     if typ == 'trade': | ||||
|                         last_trade = Trade(**(quote['data'])) | ||||
|                         break | ||||
| 
 | ||||
|             else: | ||||
|                 last_trade = Trade(**(last_trades[0])) | ||||
| 
 | ||||
|             first_quote = { | ||||
|                 'symbol': sym, | ||||
|                 'last': last_trade.price, | ||||
|                 'brokerd_ts': last_trade.timestamp, | ||||
|                 'ticks': [{ | ||||
|                     'type': 'trade', | ||||
|                     'price': last_trade.price, | ||||
|                     'size': last_trade.amount, | ||||
|                     'broker_ts': last_trade.timestamp | ||||
|                 }] | ||||
|             } | ||||
|             task_status.started((init_msgs,  first_quote)) | ||||
| 
 | ||||
|         async with aclosing(stream): | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             async for typ, quote in stream: | ||||
|  | @ -338,15 +195,6 @@ async def open_symbol_search( | |||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             async for pattern in stream: | ||||
|                 # results = await client.symbol_info(sym=pattern.upper()) | ||||
| 
 | ||||
|                 matches = fuzzy.extractBests( | ||||
|                     pattern, | ||||
|                     cache, | ||||
|                     score_cutoff=30, | ||||
|                 ) | ||||
|                 # repack in dict form | ||||
|                 await stream.send( | ||||
|                     {item[0]['instrument_name']: item[0] | ||||
|                      for item in matches} | ||||
|                 ) | ||||
|                     await client.search_symbols(pattern)) | ||||
|  |  | |||
|  | @ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. | |||
| """ | ||||
| from contextlib import asynccontextmanager, AsyncExitStack | ||||
| from types import ModuleType | ||||
| from typing import Any, Callable, AsyncGenerator | ||||
| from typing import Any, Optional, Callable, AsyncGenerator | ||||
| import json | ||||
| 
 | ||||
| import trio | ||||
|  | @ -54,8 +54,8 @@ class NoBsWs: | |||
|         self, | ||||
|         url: str, | ||||
|         stack: AsyncExitStack, | ||||
|         fixture: Callable, | ||||
|         serializer: ModuleType = json, | ||||
|         fixture: Optional[Callable] = None, | ||||
|         serializer: ModuleType = json | ||||
|     ): | ||||
|         self.url = url | ||||
|         self.fixture = fixture | ||||
|  | @ -80,12 +80,14 @@ class NoBsWs: | |||
|                 self._ws = await self._stack.enter_async_context( | ||||
|                     trio_websocket.open_websocket_url(self.url) | ||||
|                 ) | ||||
|                 # rerun user code fixture | ||||
|                 ret = await self._stack.enter_async_context( | ||||
|                     self.fixture(self) | ||||
|                 ) | ||||
| 
 | ||||
|                 assert ret is None | ||||
|                 if self.fixture is not None: | ||||
|                     # rerun user code fixture | ||||
|                     ret = await self._stack.enter_async_context( | ||||
|                         self.fixture(self) | ||||
|                     ) | ||||
| 
 | ||||
|                     assert ret is None | ||||
| 
 | ||||
|                 log.info(f'Connection success: {self.url}') | ||||
|                 return self._ws | ||||
|  | @ -121,13 +123,19 @@ class NoBsWs: | |||
|             except self.recon_errors: | ||||
|                 await self._connect() | ||||
| 
 | ||||
|     def __aiter__(self): | ||||
|         return self | ||||
| 
 | ||||
|     async def __anext__(self): | ||||
|         return await self.recv_msg() | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def open_autorecon_ws( | ||||
|     url: str, | ||||
| 
 | ||||
|     # TODO: proper type annot smh | ||||
|     fixture: Callable, | ||||
|     fixture: Optional[Callable] = None, | ||||
| 
 | ||||
| ) -> AsyncGenerator[tuple[...],  NoBsWs]: | ||||
|     """Apparently we can QoS for all sorts of reasons..so catch em. | ||||
|  |  | |||
|  | @ -20,4 +20,4 @@ | |||
| -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc | ||||
| 
 | ||||
| # ``cryptofeed`` for connecting to various crypto exchanges + custom fixes | ||||
| -e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed | ||||
| -e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue