Compare commits


No commits in common. "a117177759faf5af3475956d01c9c8cfa1d0114d" and "45788b0b53f699169ca4c4d75f8d738935629489" have entirely different histories.

3 changed files with 168 additions and 164 deletions

View File

@ -62,11 +62,6 @@ from .venues import (
from piker.accounting import (
@ -100,6 +95,55 @@ _ws_url = 'wss://'
_testnet_ws_url = 'wss://'
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
result: Optional[list[dict]] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
close: list[float]
cost: list[float]
high: list[float]
low: list[float]
open: list[float]
status: str
ticks: list[int]
volume: list[float]
class Trade(Struct):
trade_seq: int
trade_id: str
timestamp: int
tick_direction: int
price: float
mark_price: float
iv: float
instrument_name: str
index_price: float
direction: str
contracts: float
amount: float
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
block_trade_id: Optional[str] = '',
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool
# convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -236,39 +280,6 @@ class Client:
self.json_rpc = json_rpc
self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
"""Background task that adquires a first access token and then will
refresh the access token.
access_scope = 'trade:read_write'
current_ts = time.time()
if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts:
# if we are close to token expiry time
params = {
'grant_type': 'client_credentials',
'client_id': self._key_id,
'client_secret': self._key_secret,
'scope': access_scope
resp = await self.json_rpc('public/auth', params)
result = resp.result
self._auth_ts = time.time() + result['expires_in']
return await self.json_rpc(*args, **kwargs)
async def get_balances(
kind: str = 'option'
@ -279,7 +290,7 @@ class Client:
balances = {}
for currency in self.currencies:
resp = await self._json_rpc_auth_wrapper(
resp = await self.json_rpc(
'private/get_positions', params={
'currency': currency.upper(),
'kind': kind})
@ -297,7 +308,7 @@ class Client:
by symbol.
assets = {}
resp = await self._json_rpc_auth_wrapper(
resp = await self.json_rpc(
@ -310,15 +321,6 @@ class Client:
instruments = await self.symbol_info(currency=name)
for instrument in instruments:
pair = instruments[instrument]
assets[pair.symbol] = Asset(
return assets
async def get_mkt_pairs(self) -> dict[str, Pair]:
@ -344,7 +346,7 @@ class Client:
'type': 'limit',
'price': price,
resp = await self._json_rpc_auth_wrapper(
resp = await self.json_rpc(
f'private/{action}', params)
return resp.result
@ -352,7 +354,7 @@ class Client:
async def submit_cancel(self, oid: str):
"""Send cancel request for order id
resp = await self._json_rpc_auth_wrapper(
resp = await self.json_rpc(
'private/cancel', {'order_id': oid})
return resp.result
@ -360,7 +362,7 @@ class Client:
sym: str | None = None,
venue: MarketType = 'option',
venue: MarketType | None = None,
expiry: str | None = None,
) -> dict[str, Pair] | Pair:
@ -374,7 +376,7 @@ class Client:
return cached_pair
if sym:
return pair_table[sym]
return pair_table[sym.lower()]
return self._pairs
@ -400,7 +402,7 @@ class Client:
'expired': str(expired).lower()
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
resp: JSONRPCResult = await self.json_rpc(
@ -433,38 +435,10 @@ class Client:
async def cache_symbols(
venue: MarketType = 'option',
) -> dict:
) -> None:
# lookup internal mkt-specific pair table to update
pair_table: dict[str, Pair] = self._pairs
# make API request(s)
mkt_pairs = await self.symbol_info()
if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
except Exception as e:
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table
# `._pairs: ChainMap` for search B0
pairs_view_subtable[pair.bs_fqme] = pair
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
@ -497,7 +471,7 @@ class Client:
as_np: bool = True,
) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme.split('.')[0]
instrument: str = mkt.bs_mktid
if end_dt is None:
end_dt = now('UTC')
@ -510,7 +484,7 @@ class Client:
end_time = deribit_timestamp(end_dt)
resp = await self._json_rpc_auth_wrapper(
resp = await self.json_rpc(
'instrument_name': instrument.upper(),
@ -547,7 +521,7 @@ class Client:
instrument: str,
count: int = 10
resp = await self._json_rpc_auth_wrapper(
resp = await self.json_rpc(
'instrument_name': instrument,
@ -559,8 +533,7 @@ class Client:
async def get_client(
is_brokercheck: bool = False,
venue: MarketType = 'option',
is_brokercheck: bool = False
) -> Client:
async with (
@ -570,6 +543,68 @@ async def get_client(
) as json_rpc
client = Client(json_rpc)
_refresh_token: Optional[str] = None
_access_token: Optional[str] = None
async def _auth_loop(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
"""Background task that adquires a first access token and then will
refresh the access token while the nursery isn't cancelled.
renew_time = 10
access_scope = 'trade:read_write'
_expiry_time = time.time()
got_access = False
nonlocal _refresh_token
nonlocal _access_token
while True:
if time.time() - _expiry_time < renew_time:
# if we are close to token expiry time
if _refresh_token != None:
# if we have a refresh token already dont need to send
# secret
params = {
'grant_type': 'refresh_token',
'refresh_token': _refresh_token,
'scope': access_scope
# we don't have refresh token, send secret to initialize
params = {
'grant_type': 'client_credentials',
'client_id': client._key_id,
'client_secret': client._key_secret,
'scope': access_scope
resp = await json_rpc('public/auth', params)
result = resp.result
_expiry_time = time.time() + result['expires_in']
_refresh_token = result['refresh_token']
if 'access_token' in result:
_access_token = result['access_token']
if not got_access:
# first time this loop runs we must indicate task is
# started, we have auth
got_access = True
await trio.sleep(renew_time / 2)
# if we have client creds launch auth loop
if client._key_id is not None:
await n.start(_auth_loop)
await client.cache_symbols()
yield client

View File

@ -160,26 +160,38 @@ async def get_mkt_info(
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
# switch venue-mode depending on input pattern parsing
# since we want to use a particular endpoint (set) for
# pair info lookup!
client.mkt_mode = mkt_mode
pair: Pair = await client.exch_info(
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
if (
not dst
# TODO: a known asset DNE list?
# and pair.baseAsset == 'DEFI'
f'UNKNOWN {venue} asset {pair.base_currency} from,\n'
# XXX UNKNOWN missing "asset", though no idea why?
# maybe it's only avail in the margin venue(s): /dapi/ ?
return None
mkt = MktPair(
return mkt, pair
@ -198,7 +210,7 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0].split('.')[0]
sym = symbols[0]
init_msgs: list[FeedInit] = []
@ -213,11 +225,11 @@ async def stream_quotes(
nsym = piker_sym_to_cb_sym(sym)
nsym = piker_sym_to_cb_sym(sym.split('.')[0])
async with maybe_open_price_feed(sym) as stream:
cache = client._pairs
cache = await client.cache_symbols()
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
@ -259,7 +271,7 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = client._pairs
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:

View File

@ -19,7 +19,6 @@ Per market data-type definitions and schemas types.
from __future__ import annotations
import pendulum
from typing import (
@ -67,27 +66,29 @@ class Pair(Struct, frozen=True, kw_only=True):
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
tick_size: float # 0.0001
tick_size_steps: list[dict[str, str | int | float]] # [{'above_price': 0.005, 'tick_size': 0.0005}]
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
step_size: float = self.tick_size_steps[0].get('above_price')
return Decimal(step_size)
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
step_size: float = self.tick_size_steps[0].get('tick_size')
return Decimal(step_size)
def bs_fqme(self) -> str:
return f'{self.symbol}'
return self.symbol
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
return f'{self.symbol}'
class OptionPair(Pair, frozen=True):
class OptionPair(Pair, frozen=True, kw_only=True):
taker_commission: float # 0.0003
strike: float # 5000.0
@ -117,16 +118,17 @@ class OptionPair(Pair, frozen=True):
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
return iso_date
symbol: str = self.instrument_name.lower()
pair, expiry, strike_price, otype = symbol.split('-')
return f'{expiry}'
def venue(self) -> str:
return 'option'
return 'OPTION'
def bs_fqme(self) -> str:
return f'{self.symbol}'
return f'{self.symbol}.{self.venue}.{self.expiry}'
def bs_src_asset(self) -> str:
@ -134,58 +136,13 @@ class OptionPair(Pair, frozen=True):
def bs_dst_asset(self) -> str:
return f'{self.base_currency}'
def bs_mktid(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool