Compare commits
	
		
			8 Commits 
		
	
	
		
			gitea_feat
			...
			deribit
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 5c02dc6cd7 | |
|  | a5481e6746 | |
|  | 73fcd72256 | |
|  | fc2ceb5964 | |
|  | d0dbb44092 | |
|  | b20500c0d9 | |
|  | 5872095b09 | |
|  | 5f60923ac1 | 
|  | @ -50,3 +50,8 @@ prefer_data_account = [ | ||||||
| paper = "XX0000000" | paper = "XX0000000" | ||||||
| margin = "X0000000" | margin = "X0000000" | ||||||
| ira = "X0000000" | ira = "X0000000" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | [deribit] | ||||||
|  | key_id = 'XXXXXXXX' | ||||||
|  | key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx' | ||||||
|  |  | ||||||
|  | @ -0,0 +1,70 @@ | ||||||
|  | ``deribit`` backend | ||||||
|  | ------------------ | ||||||
|  | pretty good liquidity crypto derivatives, uses custom json rpc over ws for | ||||||
|  | client methods, then `cryptofeed` for data streams. | ||||||
|  | 
 | ||||||
|  | status | ||||||
|  | ****** | ||||||
|  | - supports option charts | ||||||
|  | - no order support yet  | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | config | ||||||
|  | ****** | ||||||
|  | In order to get order mode support your ``brokers.toml`` | ||||||
|  | needs to have something like the following: | ||||||
|  | 
 | ||||||
|  | .. code:: toml | ||||||
|  | 
 | ||||||
|  |     [deribit] | ||||||
|  |     key_id = 'XXXXXXXX' | ||||||
|  |     key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx' | ||||||
|  | 
 | ||||||
|  | To obtain an api id and secret you need to create an account, which can be a | ||||||
|  | real market account over at: | ||||||
|  | 
 | ||||||
|  |     - deribit.com  (requires KYC for deposit address) | ||||||
|  | 
 | ||||||
|  | Or a testnet account over at: | ||||||
|  | 
 | ||||||
|  |     - test.deribit.com | ||||||
|  | 
 | ||||||
|  | For testnet once the account is created here is how you deposit fake crypto to | ||||||
|  | try it out: | ||||||
|  | 
 | ||||||
|  | 1) Go to Wallet: | ||||||
|  | 
 | ||||||
|  | .. figure:: assets/0_wallet.png | ||||||
|  |     :align: center | ||||||
|  |     :target: assets/0_wallet.png | ||||||
|  |     :alt: wallet page | ||||||
|  | 
 | ||||||
|  | 2) Then click on the elipsis menu and select deposit | ||||||
|  | 
 | ||||||
|  | .. figure:: assets/1_wallet_select_deposit.png | ||||||
|  |     :align: center | ||||||
|  |     :target: assets/1_wallet_select_deposit.png | ||||||
|  |     :alt: wallet deposit page | ||||||
|  | 
 | ||||||
|  | 3) This will take you to the deposit address page | ||||||
|  | 
 | ||||||
|  | .. figure:: assets/2_gen_deposit_addr.png | ||||||
|  |     :align: center | ||||||
|  |     :target: assets/2_gen_deposit_addr.png | ||||||
|  |     :alt: generate deposit address page | ||||||
|  | 
 | ||||||
|  | 4) After clicking generate you should see the address, copy it and go to the  | ||||||
|  | [coin faucet](https://test.deribit.com/dericoin/BTC/deposit) and send fake | ||||||
|  | coins to that address. | ||||||
|  | 
 | ||||||
|  | .. figure:: assets/3_deposit_address.png | ||||||
|  |     :align: center | ||||||
|  |     :target: assets/3_deposit_address.png | ||||||
|  |     :alt: generated address | ||||||
|  | 
 | ||||||
|  | 5) Back in the deposit address page you should see the deposit in your history | ||||||
|  | 
 | ||||||
|  | .. figure:: assets/4_wallet_deposit_history.png | ||||||
|  |     :align: center | ||||||
|  |     :target: assets/4_wallet_deposit_history.png | ||||||
|  |     :alt: wallet deposit history | ||||||
|  | @ -50,7 +50,7 @@ __all__ = [ | ||||||
| __enable_modules__: list[str] = [ | __enable_modules__: list[str] = [ | ||||||
|     'api', |     'api', | ||||||
|     'feed', |     'feed', | ||||||
| #    'broker', |     'broker', | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| # passed to ``tractor.ActorNursery.start_actor()`` | # passed to ``tractor.ActorNursery.start_actor()`` | ||||||
|  |  | ||||||
|  | @ -1,3 +1,6 @@ | ||||||
|  | # piker: trading gear for hackers | ||||||
|  | # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | ||||||
|  | 
 | ||||||
| # This program is free software: you can redistribute it and/or modify | # This program is free software: you can redistribute it and/or modify | ||||||
| # it under the terms of the GNU Affero General Public License as published by | # it under the terms of the GNU Affero General Public License as published by | ||||||
| # the Free Software Foundation, either version 3 of the License, or | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | @ -15,29 +18,52 @@ | ||||||
| Deribit backend. | Deribit backend. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
|  | import json | ||||||
|  | import time | ||||||
|  | import asyncio | ||||||
| 
 | 
 | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm, AsyncExitStack | ||||||
|  | from itertools import count | ||||||
|  | from functools import partial | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
| from typing import Any, Optional, List | from typing import Any, List, Dict, Optional, Iterable, Callable | ||||||
| 
 | 
 | ||||||
| import pendulum | import pendulum | ||||||
| import asks | import asks | ||||||
|  | import trio | ||||||
|  | from trio_typing import Nursery, TaskStatus | ||||||
| from fuzzywuzzy import process as fuzzy | from fuzzywuzzy import process as fuzzy | ||||||
| import numpy as np | import numpy as np | ||||||
| 
 | 
 | ||||||
| from piker.data.types import Struct | from piker.data.types import Struct | ||||||
|  | from piker.data._web_bs import NoBsWs, open_autorecon_ws | ||||||
| 
 | 
 | ||||||
| from .._util import resproc | from .._util import resproc | ||||||
| 
 | 
 | ||||||
| from piker import config | from piker import config | ||||||
| from piker.log import get_logger | from piker.log import get_logger | ||||||
| 
 | 
 | ||||||
|  | from tractor.trionics import broadcast_receiver, BroadcastReceiver, maybe_open_context | ||||||
|  | from tractor import to_asyncio | ||||||
|  | 
 | ||||||
|  | from cryptofeed import FeedHandler | ||||||
|  | 
 | ||||||
|  | from cryptofeed.defines import ( | ||||||
|  |     DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT | ||||||
|  | ) | ||||||
| from cryptofeed.symbols import Symbol | from cryptofeed.symbols import Symbol | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | _spawn_kwargs = { | ||||||
|  |     'infect_asyncio': True, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| _url = 'https://www.deribit.com' | _url = 'https://www.deribit.com' | ||||||
|  | _ws_url = 'wss://www.deribit.com/ws/api/v2' | ||||||
|  | _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # Broker specific ohlc schema (rest) | # Broker specific ohlc schema (rest) | ||||||
|  | @ -55,7 +81,9 @@ _ohlc_dtype = [ | ||||||
| 
 | 
 | ||||||
| class JSONRPCResult(Struct): | class JSONRPCResult(Struct): | ||||||
|     jsonrpc: str = '2.0' |     jsonrpc: str = '2.0' | ||||||
|     result: dict  |     id: int | ||||||
|  |     result: Optional[dict] = None | ||||||
|  |     error: Optional[dict] = None | ||||||
|     usIn: int  |     usIn: int  | ||||||
|     usOut: int  |     usOut: int  | ||||||
|     usDiff: int  |     usDiff: int  | ||||||
|  | @ -83,6 +111,8 @@ class Trade(Struct): | ||||||
|     instrument_name: str |     instrument_name: str | ||||||
|     index_price: float |     index_price: float | ||||||
|     direction: str |     direction: str | ||||||
|  |     combo_trade_id: Optional[int] = 0, | ||||||
|  |     combo_id: Optional[str] = '', | ||||||
|     amount: float |     amount: float | ||||||
| 
 | 
 | ||||||
| class LastTradesResult(Struct): | class LastTradesResult(Struct): | ||||||
|  | @ -95,24 +125,161 @@ def deribit_timestamp(when): | ||||||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) |     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def str_to_cb_sym(name: str) -> Symbol: | ||||||
|  |     base, strike_price, expiry_date, option_type = name.split('-') | ||||||
|  | 
 | ||||||
|  |     quote = base | ||||||
|  | 
 | ||||||
|  |     if option_type == 'put': | ||||||
|  |         option_type = PUT  | ||||||
|  |     elif option_type  == 'call': | ||||||
|  |         option_type = CALL | ||||||
|  |     else: | ||||||
|  |         raise Exception("Couldn\'t parse option type") | ||||||
|  | 
 | ||||||
|  |     return Symbol( | ||||||
|  |         base, quote, | ||||||
|  |         type=OPTION, | ||||||
|  |         strike_price=strike_price, | ||||||
|  |         option_type=option_type, | ||||||
|  |         expiry_date=expiry_date, | ||||||
|  |         expiry_normalize=False) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def piker_sym_to_cb_sym(name: str) -> Symbol: | ||||||
|  |     base, expiry_date, strike_price, option_type = tuple( | ||||||
|  |         name.upper().split('-')) | ||||||
|  | 
 | ||||||
|  |     quote = base | ||||||
|  | 
 | ||||||
|  |     if option_type == 'P': | ||||||
|  |         option_type = PUT  | ||||||
|  |     elif option_type  == 'C': | ||||||
|  |         option_type = CALL | ||||||
|  |     else: | ||||||
|  |         raise Exception("Couldn\'t parse option type") | ||||||
|  | 
 | ||||||
|  |     return Symbol( | ||||||
|  |         base, quote, | ||||||
|  |         type=OPTION, | ||||||
|  |         strike_price=strike_price, | ||||||
|  |         option_type=option_type, | ||||||
|  |         expiry_date=expiry_date.upper()) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def cb_sym_to_deribit_inst(sym: Symbol): | ||||||
|  |     # cryptofeed normalized | ||||||
|  |     cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] | ||||||
|  | 
 | ||||||
|  |     # deribit specific  | ||||||
|  |     months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] | ||||||
|  | 
 | ||||||
|  |     exp = sym.expiry_date | ||||||
|  | 
 | ||||||
|  |     # YYMDD | ||||||
|  |     # 01234 | ||||||
|  |     year, month, day = ( | ||||||
|  |         exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) | ||||||
|  | 
 | ||||||
|  |     otype = 'C' if sym.option_type == CALL else 'P' | ||||||
|  | 
 | ||||||
|  |     return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_config() -> dict[str, Any]: | ||||||
|  | 
 | ||||||
|  |     conf, path = config.load() | ||||||
|  | 
 | ||||||
|  |     section = conf.get('deribit') | ||||||
|  | 
 | ||||||
|  |     # TODO: document why we send this, basically because logging params for cryptofeed | ||||||
|  |     conf['log'] = {} | ||||||
|  |     conf['log']['disabled'] = True | ||||||
|  | 
 | ||||||
|  |     if section is None: | ||||||
|  |         log.warning(f'No config section found for deribit in {path}') | ||||||
|  | 
 | ||||||
|  |     return conf  | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| class Client: | class Client: | ||||||
| 
 | 
 | ||||||
|     def __init__(self) -> None: |     def __init__(self, json_rpc: Callable) -> None: | ||||||
|         self._sesh = asks.Session(connections=4) |         self._pairs: dict[str, Any] = None | ||||||
|         self._sesh.base_location = _url |  | ||||||
|         self._pairs: dict[str, Any] = {} |  | ||||||
| 
 | 
 | ||||||
|     async def _api( |         config = get_config().get('deribit', {}) | ||||||
|  | 
 | ||||||
|  |         if ('key_id' in config) and ('key_secret' in config): | ||||||
|  |             self._key_id = config['key_id'] | ||||||
|  |             self._key_secret = config['key_secret'] | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             self._key_id = None | ||||||
|  |             self._key_secret = None | ||||||
|  | 
 | ||||||
|  |         self.json_rpc = json_rpc | ||||||
|  | 
 | ||||||
|  |     @property | ||||||
|  |     def currencies(self): | ||||||
|  |         return ['btc', 'eth', 'sol', 'usd'] | ||||||
|  | 
 | ||||||
|  |     async def get_balances(self, kind: str = 'option') -> dict[str, float]: | ||||||
|  |         """Return the set of positions for this account | ||||||
|  |         by symbol. | ||||||
|  |         """ | ||||||
|  |         balances = {} | ||||||
|  | 
 | ||||||
|  |         for currency in self.currencies: | ||||||
|  |             resp = await self.json_rpc( | ||||||
|  |                 'private/get_positions', params={ | ||||||
|  |                     'currency': currency.upper(), | ||||||
|  |                     'kind': kind}) | ||||||
|  | 
 | ||||||
|  |             balances[currency] = resp.result | ||||||
|  | 
 | ||||||
|  |         return balances | ||||||
|  | 
 | ||||||
|  |     async def get_assets(self) -> dict[str, float]: | ||||||
|  |         """Return the set of asset balances for this account | ||||||
|  |         by symbol. | ||||||
|  |         """ | ||||||
|  |         balances = {} | ||||||
|  | 
 | ||||||
|  |         for currency in self.currencies: | ||||||
|  |             resp = await self.json_rpc( | ||||||
|  |                 'private/get_account_summary', params={ | ||||||
|  |                     'currency': currency.upper()}) | ||||||
|  | 
 | ||||||
|  |             balances[currency] = resp.result['balance'] | ||||||
|  | 
 | ||||||
|  |         return balances | ||||||
|  | 
 | ||||||
|  |     async def submit_limit( | ||||||
|         self, |         self, | ||||||
|         method: str, |         symbol: str, | ||||||
|         params: dict, |         price: float, | ||||||
|     ) -> dict[str, Any]: |         action: str, | ||||||
|         resp = await self._sesh.get( |         size: float | ||||||
|             path=f'/api/v2/public/{method}', |     ) -> dict: | ||||||
|             params=params, |         """Place an order | ||||||
|             timeout=float('inf') |         """ | ||||||
|         ) |         params = { | ||||||
|         return resproc(resp, log) |             'instrument_name': symbol.upper(), | ||||||
|  |             'amount': size, | ||||||
|  |             'type': 'limit', | ||||||
|  |             'price': price, | ||||||
|  |         } | ||||||
|  |         resp = await self.json_rpc( | ||||||
|  |             f'private/{action}', params) | ||||||
|  | 
 | ||||||
|  |         return resp.result | ||||||
|  | 
 | ||||||
|  |     async def submit_cancel(self, oid: str): | ||||||
|  |         """Send cancel request for order id | ||||||
|  |         """ | ||||||
|  |         resp = await self.json_rpc( | ||||||
|  |             'private/cancel', {'order_id': oid}) | ||||||
|  |         return resp.result | ||||||
| 
 | 
 | ||||||
|     async def symbol_info( |     async def symbol_info( | ||||||
|         self, |         self, | ||||||
|  | @ -121,11 +288,11 @@ class Client: | ||||||
|         kind: str = 'option', |         kind: str = 'option', | ||||||
|         expired: bool = False |         expired: bool = False | ||||||
|     ) -> dict[str, Any]: |     ) -> dict[str, Any]: | ||||||
|         '''Get symbol info for the exchange. |         """Get symbol info for the exchange. | ||||||
| 
 | 
 | ||||||
|         ''' |         """ | ||||||
|         # TODO: we can load from our self._pairs cache |         if self._pairs: | ||||||
|         # on repeat calls... |             return self._pairs | ||||||
| 
 | 
 | ||||||
|         # will retrieve all symbols by default |         # will retrieve all symbols by default | ||||||
|         params = { |         params = { | ||||||
|  | @ -134,13 +301,13 @@ class Client: | ||||||
|             'expired': str(expired).lower() |             'expired': str(expired).lower() | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         resp = await self._api( |         resp = await self.json_rpc('public/get_instruments', params) | ||||||
|             'get_instruments', params=params) |         results = resp.result | ||||||
| 
 |  | ||||||
|         results = resp['result'] |  | ||||||
| 
 | 
 | ||||||
|         instruments = { |         instruments = { | ||||||
|             item['instrument_name']: item for item in results} |             item['instrument_name'].lower(): item | ||||||
|  |             for item in results | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|         if instrument is not None: |         if instrument is not None: | ||||||
|             return instruments[instrument] |             return instruments[instrument] | ||||||
|  | @ -158,20 +325,18 @@ class Client: | ||||||
|     async def search_symbols( |     async def search_symbols( | ||||||
|         self, |         self, | ||||||
|         pattern: str, |         pattern: str, | ||||||
|         limit: int = None, |         limit: int = 30, | ||||||
|     ) -> dict[str, Any]: |     ) -> dict[str, Any]: | ||||||
|         if self._pairs is not None: |  | ||||||
|             data = self._pairs |  | ||||||
|         else: |  | ||||||
|         data = await self.symbol_info() |         data = await self.symbol_info() | ||||||
| 
 | 
 | ||||||
|         matches = fuzzy.extractBests( |         matches = fuzzy.extractBests( | ||||||
|             pattern, |             pattern, | ||||||
|             data, |             data, | ||||||
|             score_cutoff=50, |             score_cutoff=35, | ||||||
|  |             limit=limit | ||||||
|         ) |         ) | ||||||
|         # repack in dict form |         # repack in dict form | ||||||
|         return {item[0]['instrument_name']: item[0] |         return {item[0]['instrument_name'].lower(): item[0] | ||||||
|                 for item in matches} |                 for item in matches} | ||||||
| 
 | 
 | ||||||
|     async def bars( |     async def bars( | ||||||
|  | @ -195,19 +360,16 @@ class Client: | ||||||
|         end_time = deribit_timestamp(end_dt) |         end_time = deribit_timestamp(end_dt) | ||||||
| 
 | 
 | ||||||
|         # https://docs.deribit.com/#public-get_tradingview_chart_data |         # https://docs.deribit.com/#public-get_tradingview_chart_data | ||||||
|         response = await self._api( |         resp = await self.json_rpc( | ||||||
|             'get_tradingview_chart_data', |             'public/get_tradingview_chart_data', | ||||||
|             params={ |             params={ | ||||||
|                 'instrument_name': instrument.upper(), |                 'instrument_name': instrument.upper(), | ||||||
|                 'start_timestamp': start_time, |                 'start_timestamp': start_time, | ||||||
|                 'end_timestamp': end_time, |                 'end_timestamp': end_time, | ||||||
|                 'resolution': '1' |                 'resolution': '1' | ||||||
|             } |             }) | ||||||
|         ) |  | ||||||
| 
 | 
 | ||||||
|         klines = JSONRPCResult(**response) |         result = KLinesResult(**resp.result) | ||||||
|      |  | ||||||
|         result = KLinesResult(**klines.result) |  | ||||||
|         new_bars = [] |         new_bars = [] | ||||||
|         for i in range(len(result.close)): |         for i in range(len(result.close)): | ||||||
| 
 | 
 | ||||||
|  | @ -237,19 +399,308 @@ class Client: | ||||||
|         instrument: str, |         instrument: str, | ||||||
|         count: int = 10 |         count: int = 10 | ||||||
|     ): |     ): | ||||||
|         response = await self._api( |         resp = await self.json_rpc( | ||||||
|             'get_last_trades_by_instrument', |             'public/get_last_trades_by_instrument', | ||||||
|             params={ |             params={ | ||||||
|                 'instrument_name': instrument, |                 'instrument_name': instrument, | ||||||
|                 'count': count |                 'count': count | ||||||
|             } |             }) | ||||||
|         ) |  | ||||||
| 
 | 
 | ||||||
|         return LastTradesResult(**(JSONRPCResult(**response).result)) |         return LastTradesResult(**resp.result) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def get_client() -> Client: | async def get_client() -> Client: | ||||||
|     client = Client() |     async with ( | ||||||
|  |         trio.open_nursery() as n, | ||||||
|  |         open_autorecon_ws(_testnet_ws_url) as ws | ||||||
|  |     ): | ||||||
|  | 
 | ||||||
|  |         _rpc_id: Iterable = count(0) | ||||||
|  |         _rpc_results: Dict[int, Dict] = {} | ||||||
|  | 
 | ||||||
|  |         _expiry_time: int = float('inf') | ||||||
|  |         _access_token: Optional[str] = None | ||||||
|  |         _refresh_token: Optional[str] = None | ||||||
|  | 
 | ||||||
|  |         def _next_json_body(method: str, params: Dict): | ||||||
|  |             """get the typical json rpc 2.0 msg body and increment the req id | ||||||
|  |             """ | ||||||
|  |             return { | ||||||
|  |                 'jsonrpc': '2.0', | ||||||
|  |                 'id': next(_rpc_id), | ||||||
|  |                 'method': method, | ||||||
|  |                 'params': params | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |         async def json_rpc(method: str, params: Dict) -> Dict: | ||||||
|  |             """perform a json rpc call and wait for the result, raise exception in | ||||||
|  |             case of error field present on response | ||||||
|  |             """ | ||||||
|  |             msg = _next_json_body(method, params) | ||||||
|  |             _id = msg['id'] | ||||||
|  | 
 | ||||||
|  |             _rpc_results[_id] = { | ||||||
|  |                 'result': None, | ||||||
|  |                 'event': trio.Event() | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             await ws.send_msg(msg) | ||||||
|  | 
 | ||||||
|  |             await _rpc_results[_id]['event'].wait() | ||||||
|  | 
 | ||||||
|  |             ret = _rpc_results[_id]['result'] | ||||||
|  | 
 | ||||||
|  |             del _rpc_results[_id] | ||||||
|  | 
 | ||||||
|  |             if ret.error is not None: | ||||||
|  |                 raise Exception(json.dumps(ret.error, indent=4)) | ||||||
|  | 
 | ||||||
|  |             return ret | ||||||
|  | 
 | ||||||
|  |         async def _recv_task(): | ||||||
|  |             """receives every ws message and stores it in its corresponding result | ||||||
|  |             field, then sets the event to wakeup original sender tasks. | ||||||
|  |             """ | ||||||
|  |             async for msg in ws: | ||||||
|  |                 msg = JSONRPCResult(**msg) | ||||||
|  | 
 | ||||||
|  |                 if msg.id not in _rpc_results: | ||||||
|  |                     # in case this message wasn't beign accounted for store it  | ||||||
|  |                     _rpc_results[msg.id] = { | ||||||
|  |                         'result': None, | ||||||
|  |                         'event': trio.Event() | ||||||
|  |                     } | ||||||
|  | 
 | ||||||
|  |                 _rpc_results[msg.id]['result'] = msg | ||||||
|  |                 _rpc_results[msg.id]['event'].set() | ||||||
|  | 
 | ||||||
|  |         client = Client(json_rpc) | ||||||
|  | 
 | ||||||
|  |         async def _auth_loop( | ||||||
|  |             task_status: TaskStatus = trio.TASK_STATUS_IGNORED | ||||||
|  |         ): | ||||||
|  |             """Background task that adquires a first access token and then will | ||||||
|  |             refresh the access token while the nursery isn't cancelled. | ||||||
|  | 
 | ||||||
|  |             https://docs.deribit.com/?python#authentication-2 | ||||||
|  |             """ | ||||||
|  |             renew_time = 10 | ||||||
|  |             access_scope = 'trade:read_write' | ||||||
|  |             _expiry_time = time.time() | ||||||
|  |             got_access = False | ||||||
|  |             nonlocal _refresh_token | ||||||
|  |             nonlocal _access_token | ||||||
|  | 
 | ||||||
|  |             while True: | ||||||
|  |                 if time.time() - _expiry_time < renew_time: | ||||||
|  |                     # if we are close to token expiry time | ||||||
|  | 
 | ||||||
|  |                     if _refresh_token != None: | ||||||
|  |                         # if we have a refresh token already dont need to send | ||||||
|  |                         # secret | ||||||
|  |                         params = { | ||||||
|  |                             'grant_type': 'refresh_token', | ||||||
|  |                             'refresh_token': _refresh_token, | ||||||
|  |                             'scope': access_scope | ||||||
|  |                         } | ||||||
|  | 
 | ||||||
|  |                     else: | ||||||
|  |                         # we don't have refresh token, send secret to initialize | ||||||
|  |                         params = { | ||||||
|  |                             'grant_type': 'client_credentials', | ||||||
|  |                             'client_id': client._key_id, | ||||||
|  |                             'client_secret': client._key_secret, | ||||||
|  |                             'scope': access_scope | ||||||
|  |                         } | ||||||
|  | 
 | ||||||
|  |                     resp = await json_rpc('public/auth', params) | ||||||
|  |                     result = resp.result | ||||||
|  | 
 | ||||||
|  |                     _expiry_time = time.time() + result['expires_in'] | ||||||
|  |                     _refresh_token = result['refresh_token'] | ||||||
|  | 
 | ||||||
|  |                     if 'access_token' in result: | ||||||
|  |                         _access_token = result['access_token'] | ||||||
|  | 
 | ||||||
|  |                     if not got_access: | ||||||
|  |                         # first time this loop runs we must indicate task is | ||||||
|  |                         # started, we have auth | ||||||
|  |                         got_access = True | ||||||
|  |                         task_status.started() | ||||||
|  | 
 | ||||||
|  |                 else: | ||||||
|  |                     await trio.sleep(renew_time / 2) | ||||||
|  | 
 | ||||||
|  |         n.start_soon(_recv_task) | ||||||
|  |         # if we have client creds launch auth loop | ||||||
|  |         if client._key_id is not None: | ||||||
|  |             await n.start(_auth_loop) | ||||||
|  | 
 | ||||||
|         await client.cache_symbols() |         await client.cache_symbols() | ||||||
|         yield client |         yield client | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def open_feed_handler(): | ||||||
|  |     fh = FeedHandler(config=get_config()) | ||||||
|  |     yield fh | ||||||
|  |     await to_asyncio.run_task(fh.stop_async) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: | ||||||
|  |     async with maybe_open_context( | ||||||
|  |         acm_func=open_feed_handler, | ||||||
|  |         key='feedhandler', | ||||||
|  |     ) as (cache_hit, fh): | ||||||
|  |         yield fh | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def open_price_feed( | ||||||
|  |     instrument: str | ||||||
|  | ) -> trio.abc.ReceiveStream: | ||||||
|  | 
 | ||||||
|  |     # XXX: hangs when going into this ctx mngr | ||||||
|  |     async with maybe_open_feed_handler() as fh: | ||||||
|  | 
 | ||||||
|  |         async def relay( | ||||||
|  |             from_trio: asyncio.Queue, | ||||||
|  |             to_trio: trio.abc.SendChannel, | ||||||
|  |         ) -> None: | ||||||
|  |             async def _trade(data: dict, receipt_timestamp): | ||||||
|  |                 to_trio.send_nowait(('trade', { | ||||||
|  |                     'symbol': cb_sym_to_deribit_inst( | ||||||
|  |                         str_to_cb_sym(data.symbol)).lower(), | ||||||
|  |                     'last': data, | ||||||
|  |                     'broker_ts': time.time(), | ||||||
|  |                     'data': data.to_dict(), | ||||||
|  |                     'receipt': receipt_timestamp | ||||||
|  |                 })) | ||||||
|  | 
 | ||||||
|  |             async def _l1(data: dict, receipt_timestamp): | ||||||
|  |                 to_trio.send_nowait(('l1', { | ||||||
|  |                     'symbol': cb_sym_to_deribit_inst( | ||||||
|  |                         str_to_cb_sym(data.symbol)).lower(), | ||||||
|  |                     'ticks': [ | ||||||
|  |                         {'type': 'bid', | ||||||
|  |                             'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||||
|  |                         {'type': 'bsize', | ||||||
|  |                             'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||||
|  |                         {'type': 'ask', | ||||||
|  |                             'price': float(data.ask_price), 'size': float(data.ask_size)}, | ||||||
|  |                         {'type': 'asize', | ||||||
|  |                             'price': float(data.ask_price), 'size': float(data.ask_size)} | ||||||
|  |                     ] | ||||||
|  |                 })) | ||||||
|  | 
 | ||||||
|  |             fh.add_feed( | ||||||
|  |                 DERIBIT, | ||||||
|  |                 channels=[TRADES, L1_BOOK], | ||||||
|  |                 symbols=[instrument], | ||||||
|  |                 callbacks={ | ||||||
|  |                     TRADES: _trade, | ||||||
|  |                     L1_BOOK: _l1 | ||||||
|  |                 }) | ||||||
|  | 
 | ||||||
|  |             if not fh.running: | ||||||
|  |                 fh.run( | ||||||
|  |                     start_loop=False, | ||||||
|  |                     install_signal_handlers=False) | ||||||
|  | 
 | ||||||
|  |             # sync with trio | ||||||
|  |             to_trio.send_nowait(None) | ||||||
|  | 
 | ||||||
|  |             try: | ||||||
|  |                 await asyncio.sleep(float('inf')) | ||||||
|  | 
 | ||||||
|  |             except asyncio.exceptions.CancelledError: | ||||||
|  |                 ... | ||||||
|  | 
 | ||||||
|  |         async with to_asyncio.open_channel_from( | ||||||
|  |             relay | ||||||
|  |         ) as (first, chan): | ||||||
|  |             yield chan | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def maybe_open_price_feed( | ||||||
|  |     instrument: str | ||||||
|  | ) -> trio.abc.ReceiveStream: | ||||||
|  | 
 | ||||||
|  |     # TODO: add a predicate to maybe_open_context | ||||||
|  |     async with maybe_open_context( | ||||||
|  |         acm_func=open_price_feed, | ||||||
|  |         kwargs={ | ||||||
|  |             'instrument': instrument | ||||||
|  |         }, | ||||||
|  |         key=f'{instrument}-price', | ||||||
|  |     ) as (cache_hit, feed): | ||||||
|  |         if cache_hit: | ||||||
|  |             yield broadcast_receiver(feed, 10) | ||||||
|  |         else: | ||||||
|  |             yield feed | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def open_order_feed( | ||||||
|  |     instrument: List[str] | ||||||
|  | ) -> trio.abc.ReceiveStream: | ||||||
|  | 
 | ||||||
|  |     async with maybe_open_feed_handler() as fh: | ||||||
|  | 
 | ||||||
|  |         async def relay( | ||||||
|  |             from_trio: asyncio.Queue, | ||||||
|  |             to_trio: trio.abc.SendChannel, | ||||||
|  |         ) -> None: | ||||||
|  |             async def _fill(data: dict, receipt_timestamp): | ||||||
|  |                 breakpoint() | ||||||
|  | 
 | ||||||
|  |             async def _order_info(data: dict, receipt_timestamp): | ||||||
|  |                 breakpoint() | ||||||
|  | 
 | ||||||
|  |             fh.add_feed( | ||||||
|  |                 DERIBIT, | ||||||
|  |                 channels=[FILLS, ORDER_INFO], | ||||||
|  |                 symbols=[instrument], | ||||||
|  |                 callbacks={ | ||||||
|  |                     FILLS: _fill, | ||||||
|  |                     ORDER_INFO: _order_info, | ||||||
|  |                 }) | ||||||
|  | 
 | ||||||
|  |             if not fh.running: | ||||||
|  |                 fh.run( | ||||||
|  |                     start_loop=False, | ||||||
|  |                     install_signal_handlers=False) | ||||||
|  | 
 | ||||||
|  |             # sync with trio | ||||||
|  |             to_trio.send_nowait(None) | ||||||
|  | 
 | ||||||
|  |             try: | ||||||
|  |                 await asyncio.sleep(float('inf')) | ||||||
|  | 
 | ||||||
|  |             except asyncio.exceptions.CancelledError: | ||||||
|  |                 ... | ||||||
|  | 
 | ||||||
|  |         async with to_asyncio.open_channel_from( | ||||||
|  |             relay | ||||||
|  |         ) as (first, chan): | ||||||
|  |             yield chan | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def maybe_open_order_feed( | ||||||
|  |     instrument: str | ||||||
|  | ) -> trio.abc.ReceiveStream: | ||||||
|  | 
 | ||||||
|  |     # TODO: add a predicate to maybe_open_context | ||||||
|  |     async with maybe_open_context( | ||||||
|  |         acm_func=open_order_feed, | ||||||
|  |         kwargs={ | ||||||
|  |             'instrument': instrument | ||||||
|  |         }, | ||||||
|  |         key=f'{instrument}-order', | ||||||
|  |     ) as (cache_hit, feed): | ||||||
|  |         if cache_hit: | ||||||
|  |             yield broadcast_receiver(feed, 10) | ||||||
|  |         else: | ||||||
|  |             yield feed | ||||||
|  |  | ||||||
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 169 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 106 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 59 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 70 KiB | 
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 132 KiB | 
|  | @ -0,0 +1,44 @@ | ||||||
|  | # piker: trading gear for hackers | ||||||
|  | # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | ''' | ||||||
|  | Order api and machinery | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | 
 | ||||||
|  | from typing import Any, AsyncIterator | ||||||
|  | 
 | ||||||
|  | import tractor | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def trades_dialogue( | ||||||
|  |     ctx: tractor.Context, | ||||||
|  |     loglevel: str = None, | ||||||
|  | 
 | ||||||
|  | ) -> AsyncIterator[dict[str, Any]]: | ||||||
|  | 
 | ||||||
|  |     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||||
|  |     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||||
|  | 
 | ||||||
|  |     async with open_cached_client('deribit') as client: | ||||||
|  |         if not client._key_id: | ||||||
|  |             raise RuntimeError('Missing Deribit API key in `brokers.toml`!?!?') | ||||||
|  | 
 | ||||||
|  |         acc_name = f'deribit.{client._key_id}' | ||||||
|  | 
 | ||||||
|  |         await client.cache_symbols() | ||||||
|  | 
 | ||||||
|  |         breakpoint() | ||||||
|  | @ -1,3 +1,6 @@ | ||||||
|  | # piker: trading gear for hackers | ||||||
|  | # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | ||||||
|  | 
 | ||||||
| # This program is free software: you can redistribute it and/or modify | # This program is free software: you can redistribute it and/or modify | ||||||
| # it under the terms of the GNU Affero General Public License as published by | # it under the terms of the GNU Affero General Public License as published by | ||||||
| # the Free Software Foundation, either version 3 of the License, or | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | @ -15,9 +18,6 @@ | ||||||
| Deribit backend. | Deribit backend. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| 
 |  | ||||||
| import asyncio |  | ||||||
| from async_generator import aclosing |  | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
| from typing import Any, Optional, List, Callable | from typing import Any, Optional, List, Callable | ||||||
|  | @ -29,9 +29,7 @@ import pendulum | ||||||
| from fuzzywuzzy import process as fuzzy | from fuzzywuzzy import process as fuzzy | ||||||
| import numpy as np | import numpy as np | ||||||
| import tractor | import tractor | ||||||
| from tractor import to_asyncio |  | ||||||
| 
 | 
 | ||||||
| from piker import config |  | ||||||
| from piker._cacheables import open_cached_client | from piker._cacheables import open_cached_client | ||||||
| from piker.log import get_logger, get_console_log | from piker.log import get_logger, get_console_log | ||||||
| from piker.data import ShmArray | from piker.data import ShmArray | ||||||
|  | @ -47,170 +45,21 @@ from cryptofeed.defines import ( | ||||||
| ) | ) | ||||||
| from cryptofeed.symbols import Symbol | from cryptofeed.symbols import Symbol | ||||||
| 
 | 
 | ||||||
| from .api import Client, Trade | from .api import ( | ||||||
|  |     Client, Trade, | ||||||
|  |     get_config, | ||||||
|  |     str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||||
|  |     maybe_open_price_feed | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| _spawn_kwargs = { | _spawn_kwargs = { | ||||||
|     'infect_asyncio': True, |     'infect_asyncio': True, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_config() -> dict[str, Any]: |  | ||||||
| 
 |  | ||||||
|     conf, path = config.load() |  | ||||||
| 
 |  | ||||||
|     section = conf.get('deribit') |  | ||||||
| 
 |  | ||||||
|     if section is None: |  | ||||||
|         log.warning(f'No config section found for deribit in {path}') |  | ||||||
|         return {} |  | ||||||
| 
 |  | ||||||
|     conf['log'] = {} |  | ||||||
|     conf['log']['disabled'] = True |  | ||||||
| 
 |  | ||||||
| #    conf['log']['filename'] = '/tmp/feedhandler.log' |  | ||||||
| #    conf['log']['level'] = 'WARNING' |  | ||||||
| 
 |  | ||||||
|     return conf  |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| _url = 'https://www.deribit.com' |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def str_to_cb_sym(name: str) -> Symbol: |  | ||||||
|     base, strike_price, expiry_date, option_type = name.split('-') |  | ||||||
| 
 |  | ||||||
|     quote = base |  | ||||||
| 
 |  | ||||||
|     if option_type == 'put': |  | ||||||
|         option_type = PUT  |  | ||||||
|     elif option_type  == 'call': |  | ||||||
|         option_type = CALL |  | ||||||
|     else: |  | ||||||
|         raise BaseException("Couldn\'t parse option type") |  | ||||||
| 
 |  | ||||||
|     return Symbol( |  | ||||||
|         base, quote, |  | ||||||
|         type=OPTION, |  | ||||||
|         strike_price=strike_price, |  | ||||||
|         option_type=option_type, |  | ||||||
|         expiry_date=expiry_date, |  | ||||||
|         expiry_normalize=False) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def piker_sym_to_cb_sym(name: str) -> Symbol: |  | ||||||
|     base, expiry_date, strike_price, option_type = tuple( |  | ||||||
|         name.upper().split('-')) |  | ||||||
| 
 |  | ||||||
|     quote = base |  | ||||||
| 
 |  | ||||||
|     if option_type == 'P': |  | ||||||
|         option_type = PUT  |  | ||||||
|     elif option_type  == 'C': |  | ||||||
|         option_type = CALL |  | ||||||
|     else: |  | ||||||
|         raise BaseException("Couldn\'t parse option type") |  | ||||||
| 
 |  | ||||||
|     return Symbol( |  | ||||||
|         base, quote, |  | ||||||
|         type=OPTION, |  | ||||||
|         strike_price=strike_price, |  | ||||||
|         option_type=option_type, |  | ||||||
|         expiry_date=expiry_date.upper()) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def cb_sym_to_deribit_inst(sym: Symbol): |  | ||||||
|     # cryptofeed normalized |  | ||||||
|     cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] |  | ||||||
| 
 |  | ||||||
|     # deribit specific  |  | ||||||
|     months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] |  | ||||||
|      |  | ||||||
|     exp = sym.expiry_date |  | ||||||
| 
 |  | ||||||
|     # YYMDD |  | ||||||
|     # 01234 |  | ||||||
|     year, month, day = ( |  | ||||||
|         exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) |  | ||||||
| 
 |  | ||||||
|     otype = 'C' if sym.option_type == CALL else 'P' |  | ||||||
| 
 |  | ||||||
|     return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # inside here we are in an asyncio context |  | ||||||
| async def open_aio_cryptofeed_relay( |  | ||||||
|     from_trio: asyncio.Queue, |  | ||||||
|     to_trio: trio.abc.SendChannel, |  | ||||||
|     instruments: List[str] = [] |  | ||||||
| ) -> None: |  | ||||||
| 
 |  | ||||||
|     instruments = [piker_sym_to_cb_sym(i) for i in instruments] |  | ||||||
| 
 |  | ||||||
|     async def trade_cb(data: dict, receipt_timestamp): |  | ||||||
|         to_trio.send_nowait(('trade', { |  | ||||||
|             'symbol': cb_sym_to_deribit_inst( |  | ||||||
|                 str_to_cb_sym(data.symbol)).lower(), |  | ||||||
|             'last': data, |  | ||||||
|             'broker_ts': time.time(), |  | ||||||
|             'data': data.to_dict(), |  | ||||||
|             'receipt': receipt_timestamp |  | ||||||
|         })) |  | ||||||
| 
 |  | ||||||
|     async def l1_book_cb(data: dict, receipt_timestamp): |  | ||||||
|         to_trio.send_nowait(('l1', { |  | ||||||
|             'symbol': cb_sym_to_deribit_inst( |  | ||||||
|                 str_to_cb_sym(data.symbol)).lower(), |  | ||||||
|             'ticks': [ |  | ||||||
|                 {'type': 'bid', |  | ||||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, |  | ||||||
|                 {'type': 'bsize', |  | ||||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, |  | ||||||
|                 {'type': 'ask', |  | ||||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)}, |  | ||||||
|                 {'type': 'asize', |  | ||||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)} |  | ||||||
|             ] |  | ||||||
|         })) |  | ||||||
| 
 |  | ||||||
|     fh = FeedHandler(config=get_config()) |  | ||||||
|     fh.run(start_loop=False) |  | ||||||
| 
 |  | ||||||
|     fh.add_feed( |  | ||||||
|         DERIBIT, |  | ||||||
|         channels=[L1_BOOK], |  | ||||||
|         symbols=instruments, |  | ||||||
|         callbacks={L1_BOOK: l1_book_cb}) |  | ||||||
| 
 |  | ||||||
|     fh.add_feed( |  | ||||||
|         DERIBIT, |  | ||||||
|         channels=[TRADES], |  | ||||||
|         symbols=instruments, |  | ||||||
|         callbacks={TRADES: trade_cb}) |  | ||||||
| 
 |  | ||||||
|     # sync with trio |  | ||||||
|     to_trio.send_nowait(None) |  | ||||||
| 
 |  | ||||||
|     await asyncio.sleep(float('inf')) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @acm |  | ||||||
| async def open_cryptofeeds( |  | ||||||
| 
 |  | ||||||
|     instruments: List[str] |  | ||||||
| 
 |  | ||||||
| ) -> trio.abc.ReceiveStream: |  | ||||||
| 
 |  | ||||||
|     async with to_asyncio.open_channel_from( |  | ||||||
|         open_aio_cryptofeed_relay, |  | ||||||
|         instruments=instruments, |  | ||||||
|     ) as (first, chan): |  | ||||||
|         yield chan |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @acm | @acm | ||||||
| async def open_history_client( | async def open_history_client( | ||||||
|     instrument: str, |     instrument: str, | ||||||
|  | @ -278,9 +127,7 @@ async def stream_quotes( | ||||||
| 
 | 
 | ||||||
|     async with ( |     async with ( | ||||||
|         open_cached_client('deribit') as client, |         open_cached_client('deribit') as client, | ||||||
|         send_chan as send_chan, |         send_chan as send_chan | ||||||
|         trio.open_nursery() as n, |  | ||||||
|         open_cryptofeeds(symbols) as stream  |  | ||||||
|     ): |     ): | ||||||
| 
 | 
 | ||||||
|         init_msgs = { |         init_msgs = { | ||||||
|  | @ -298,11 +145,22 @@ async def stream_quotes( | ||||||
| 
 | 
 | ||||||
|         nsym = piker_sym_to_cb_sym(sym) |         nsym = piker_sym_to_cb_sym(sym) | ||||||
| 
 | 
 | ||||||
|         # keep client cached for real-time section |         async with maybe_open_price_feed(sym) as stream: | ||||||
|  | 
 | ||||||
|             cache = await client.cache_symbols() |             cache = await client.cache_symbols() | ||||||
| 
 | 
 | ||||||
|         last_trade = Trade(**(await client.last_trades( |             last_trades = (await client.last_trades( | ||||||
|             cb_sym_to_deribit_inst(nsym), count=1)).trades[0]) |                 cb_sym_to_deribit_inst(nsym), count=1)).trades | ||||||
|  | 
 | ||||||
|  |             if len(last_trades) == 0: | ||||||
|  |                 last_trade = None | ||||||
|  |                 async for typ, quote in stream: | ||||||
|  |                     if typ == 'trade': | ||||||
|  |                         last_trade = Trade(**(quote['data'])) | ||||||
|  |                         break | ||||||
|  | 
 | ||||||
|  |             else: | ||||||
|  |                 last_trade = Trade(**(last_trades[0])) | ||||||
| 
 | 
 | ||||||
|             first_quote = { |             first_quote = { | ||||||
|                 'symbol': sym, |                 'symbol': sym, | ||||||
|  | @ -317,7 +175,6 @@ async def stream_quotes( | ||||||
|             } |             } | ||||||
|             task_status.started((init_msgs,  first_quote)) |             task_status.started((init_msgs,  first_quote)) | ||||||
| 
 | 
 | ||||||
|         async with aclosing(stream): |  | ||||||
|             feed_is_live.set() |             feed_is_live.set() | ||||||
| 
 | 
 | ||||||
|             async for typ, quote in stream: |             async for typ, quote in stream: | ||||||
|  | @ -338,15 +195,6 @@ async def open_symbol_search( | ||||||
|         async with ctx.open_stream() as stream: |         async with ctx.open_stream() as stream: | ||||||
| 
 | 
 | ||||||
|             async for pattern in stream: |             async for pattern in stream: | ||||||
|                 # results = await client.symbol_info(sym=pattern.upper()) |  | ||||||
| 
 |  | ||||||
|                 matches = fuzzy.extractBests( |  | ||||||
|                     pattern, |  | ||||||
|                     cache, |  | ||||||
|                     score_cutoff=30, |  | ||||||
|                 ) |  | ||||||
|                 # repack in dict form |                 # repack in dict form | ||||||
|                 await stream.send( |                 await stream.send( | ||||||
|                     {item[0]['instrument_name']: item[0] |                     await client.search_symbols(pattern)) | ||||||
|                      for item in matches} |  | ||||||
|                 ) |  | ||||||
|  |  | ||||||
|  | @ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. | ||||||
| """ | """ | ||||||
| from contextlib import asynccontextmanager, AsyncExitStack | from contextlib import asynccontextmanager, AsyncExitStack | ||||||
| from types import ModuleType | from types import ModuleType | ||||||
| from typing import Any, Callable, AsyncGenerator | from typing import Any, Optional, Callable, AsyncGenerator | ||||||
| import json | import json | ||||||
| 
 | 
 | ||||||
| import trio | import trio | ||||||
|  | @ -54,8 +54,8 @@ class NoBsWs: | ||||||
|         self, |         self, | ||||||
|         url: str, |         url: str, | ||||||
|         stack: AsyncExitStack, |         stack: AsyncExitStack, | ||||||
|         fixture: Callable, |         fixture: Optional[Callable] = None, | ||||||
|         serializer: ModuleType = json, |         serializer: ModuleType = json | ||||||
|     ): |     ): | ||||||
|         self.url = url |         self.url = url | ||||||
|         self.fixture = fixture |         self.fixture = fixture | ||||||
|  | @ -80,6 +80,8 @@ class NoBsWs: | ||||||
|                 self._ws = await self._stack.enter_async_context( |                 self._ws = await self._stack.enter_async_context( | ||||||
|                     trio_websocket.open_websocket_url(self.url) |                     trio_websocket.open_websocket_url(self.url) | ||||||
|                 ) |                 ) | ||||||
|  | 
 | ||||||
|  |                 if self.fixture is not None: | ||||||
|                     # rerun user code fixture |                     # rerun user code fixture | ||||||
|                     ret = await self._stack.enter_async_context( |                     ret = await self._stack.enter_async_context( | ||||||
|                         self.fixture(self) |                         self.fixture(self) | ||||||
|  | @ -121,13 +123,19 @@ class NoBsWs: | ||||||
|             except self.recon_errors: |             except self.recon_errors: | ||||||
|                 await self._connect() |                 await self._connect() | ||||||
| 
 | 
 | ||||||
|  |     def __aiter__(self): | ||||||
|  |         return self | ||||||
|  | 
 | ||||||
|  |     async def __anext__(self): | ||||||
|  |         return await self.recv_msg() | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| @asynccontextmanager | @asynccontextmanager | ||||||
| async def open_autorecon_ws( | async def open_autorecon_ws( | ||||||
|     url: str, |     url: str, | ||||||
| 
 | 
 | ||||||
|     # TODO: proper type annot smh |     # TODO: proper type annot smh | ||||||
|     fixture: Callable, |     fixture: Optional[Callable] = None, | ||||||
| 
 | 
 | ||||||
| ) -> AsyncGenerator[tuple[...],  NoBsWs]: | ) -> AsyncGenerator[tuple[...],  NoBsWs]: | ||||||
|     """Apparently we can QoS for all sorts of reasons..so catch em. |     """Apparently we can QoS for all sorts of reasons..so catch em. | ||||||
|  |  | ||||||
|  | @ -20,4 +20,4 @@ | ||||||
| -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc | -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc | ||||||
| 
 | 
 | ||||||
| # ``cryptofeed`` for connecting to various crypto exchanges + custom fixes | # ``cryptofeed`` for connecting to various crypto exchanges + custom fixes | ||||||
| -e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed | -e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue