Compare commits
17 Commits
c65929bfe7
...
3dcee16bf6
Author | SHA1 | Date |
---|---|---|
|
3dcee16bf6 | |
|
1f41b151d7 | |
|
e8c196fd88 | |
|
7cefb202fb | |
|
ddb4c0269f | |
|
139f62f4de | |
|
338e292002 | |
|
13af0a90eb | |
|
e84781ca1e | |
|
576b15e2c6 | |
|
b7e54571ea | |
|
1a295c0c21 | |
|
f057a20bfa | |
|
9dba47902e | |
|
e0ecef04bb | |
|
266347bcdb | |
|
051d43b559 |
|
@ -58,13 +58,20 @@ from cryptofeed.symbols import Symbol
|
|||
# types for managing the cb callbacks.
|
||||
# from cryptofeed.types import L1Book
|
||||
from .venues import (
|
||||
_ws_url,
|
||||
MarketType,
|
||||
PAIRTYPES,
|
||||
Pair,
|
||||
OptionPair,
|
||||
JSONRPCResult,
|
||||
JSONRPCChannel,
|
||||
KLinesResult,
|
||||
Trade,
|
||||
LastTradesResult,
|
||||
)
|
||||
from piker.accounting import (
|
||||
Asset,
|
||||
digits_to_dec,
|
||||
MktPair,
|
||||
)
|
||||
from piker.data import (
|
||||
|
@ -89,60 +96,6 @@ _spawn_kwargs = {
|
|||
}
|
||||
|
||||
|
||||
_url = 'https://www.deribit.com'
|
||||
_ws_url = 'wss://www.deribit.com/ws/api/v2'
|
||||
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
||||
|
||||
|
||||
class JSONRPCResult(Struct):
|
||||
id: int
|
||||
usIn: int
|
||||
usOut: int
|
||||
usDiff: int
|
||||
testnet: bool
|
||||
jsonrpc: str = '2.0'
|
||||
result: Optional[list[dict]] = None
|
||||
error: Optional[dict] = None
|
||||
|
||||
class JSONRPCChannel(Struct):
|
||||
method: str
|
||||
params: dict
|
||||
jsonrpc: str = '2.0'
|
||||
|
||||
|
||||
class KLinesResult(Struct):
|
||||
close: list[float]
|
||||
cost: list[float]
|
||||
high: list[float]
|
||||
low: list[float]
|
||||
open: list[float]
|
||||
status: str
|
||||
ticks: list[int]
|
||||
volume: list[float]
|
||||
|
||||
class Trade(Struct):
|
||||
trade_seq: int
|
||||
trade_id: str
|
||||
timestamp: int
|
||||
tick_direction: int
|
||||
price: float
|
||||
mark_price: float
|
||||
iv: float
|
||||
instrument_name: str
|
||||
index_price: float
|
||||
direction: str
|
||||
contracts: float
|
||||
amount: float
|
||||
combo_trade_id: Optional[int] = 0,
|
||||
combo_id: Optional[str] = '',
|
||||
block_trade_leg_count: Optional[int] = 0,
|
||||
block_trade_id: Optional[str] = '',
|
||||
|
||||
class LastTradesResult(Struct):
|
||||
trades: list[Trade]
|
||||
has_more: bool
|
||||
|
||||
|
||||
# convert datetime obj timestamp to unixtime in milliseconds
|
||||
def deribit_timestamp(when):
|
||||
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
||||
|
@ -233,34 +186,22 @@ def get_config() -> dict[str, Any]:
|
|||
)
|
||||
section: dict = {}
|
||||
section = conf.get('deribit')
|
||||
|
||||
section['log'] = {}
|
||||
section['log']['filename'] = 'feedhandler.log'
|
||||
section['log']['level'] = 'DEBUG'
|
||||
section['log']['disabled'] = True
|
||||
|
||||
if section is None:
|
||||
log.warning(f'No config section found for deribit in {path}')
|
||||
return {}
|
||||
|
||||
conf_option = section.get('option', {})
|
||||
section.clear # clear the dict to reuse it
|
||||
section['deribit'] = {}
|
||||
section['deribit']['key_id'] = conf_option.get('api_key')
|
||||
section['deribit']['key_secret'] = conf_option.get('api_secret')
|
||||
|
||||
section['log'] = {}
|
||||
section['log']['filename'] = 'feedhandler.log'
|
||||
section['log']['level'] = 'DEBUG'
|
||||
|
||||
return section
|
||||
|
||||
def get_fh_config() -> dict[str, Any]:
|
||||
conf_option = get_config().get('option', {})
|
||||
conf_log = get_config().get('log', {})
|
||||
|
||||
return {
|
||||
'log': {
|
||||
'filename': conf_log.get('filename'),
|
||||
'level': conf_log.get('level'),
|
||||
'disabled': conf_log.get('disabled')
|
||||
},
|
||||
'deribit': {
|
||||
'key_id': conf_option.get('api_key'),
|
||||
'key_secret': conf_option.get('api_secret')
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class Client:
|
||||
|
||||
|
@ -272,16 +213,45 @@ class Client:
|
|||
) -> None:
|
||||
self._pairs: ChainMap[str, Pair] = ChainMap()
|
||||
|
||||
config = get_config().get('option', {})
|
||||
config = get_config().get('deribit', {})
|
||||
|
||||
self._key_id = config.get('api_key')
|
||||
self._key_secret = config.get('api_secret')
|
||||
self._key_id = config.get('key_id')
|
||||
self._key_secret = config.get('key_secret')
|
||||
|
||||
self.json_rpc = json_rpc
|
||||
|
||||
@property
|
||||
def currencies(self):
|
||||
return ['btc', 'eth', 'sol', 'usd']
|
||||
self._auth_ts = None
|
||||
self._auth_renew_ts = 5 # seconds to renew auth
|
||||
|
||||
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
|
||||
|
||||
"""Background task that adquires a first access token and then will
|
||||
refresh the access token.
|
||||
|
||||
https://docs.deribit.com/?python#authentication-2
|
||||
"""
|
||||
access_scope = 'trade:read_write'
|
||||
current_ts = time.time()
|
||||
|
||||
if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts:
|
||||
# if we are close to token expiry time
|
||||
|
||||
params = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': self._key_id,
|
||||
'client_secret': self._key_secret,
|
||||
'scope': access_scope
|
||||
}
|
||||
|
||||
resp = await self.json_rpc('public/auth', params)
|
||||
result = resp.result
|
||||
|
||||
self._auth_ts = time.time() + result['expires_in']
|
||||
|
||||
return await self.json_rpc(*args, **kwargs)
|
||||
|
||||
|
||||
|
||||
|
||||
async def get_balances(
|
||||
self,
|
||||
|
@ -293,7 +263,7 @@ class Client:
|
|||
balances = {}
|
||||
|
||||
for currency in self.currencies:
|
||||
resp = await self.json_rpc(
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
'private/get_positions', params={
|
||||
'currency': currency.upper(),
|
||||
'kind': kind})
|
||||
|
@ -311,21 +281,28 @@ class Client:
|
|||
by symbol.
|
||||
"""
|
||||
assets = {}
|
||||
resp = await self.json_rpc(
|
||||
'private/get_account_summaries',
|
||||
params={
|
||||
'extended' : True
|
||||
}
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
'public/get_currencies',
|
||||
params={}
|
||||
)
|
||||
summaries = resp.result['summaries']
|
||||
for summary in summaries:
|
||||
currency = summary['currency']
|
||||
tx_tick = Decimal('1e-08')
|
||||
currencies = resp.result
|
||||
for currency in currencies:
|
||||
name = currency['currency']
|
||||
tx_tick = digits_to_dec(currency['fee_precision'])
|
||||
atype='crypto_currency'
|
||||
assets[currency] = Asset(
|
||||
name=currency,
|
||||
assets[name] = Asset(
|
||||
name=name,
|
||||
atype=atype,
|
||||
tx_tick=tx_tick)
|
||||
|
||||
instruments = await self.symbol_info(currency=name)
|
||||
for instrument in instruments:
|
||||
pair = instruments[instrument]
|
||||
assets[pair.symbol] = Asset(
|
||||
name=pair.symbol,
|
||||
atype=pair.venue,
|
||||
tx_tick=pair.size_tick)
|
||||
|
||||
return assets
|
||||
|
||||
async def get_mkt_pairs(self) -> dict[str, Pair]:
|
||||
|
@ -351,7 +328,7 @@ class Client:
|
|||
'type': 'limit',
|
||||
'price': price,
|
||||
}
|
||||
resp = await self.json_rpc(
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
f'private/{action}', params)
|
||||
|
||||
return resp.result
|
||||
|
@ -359,7 +336,7 @@ class Client:
|
|||
async def submit_cancel(self, oid: str):
|
||||
"""Send cancel request for order id
|
||||
"""
|
||||
resp = await self.json_rpc(
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
'private/cancel', {'order_id': oid})
|
||||
return resp.result
|
||||
|
||||
|
@ -367,7 +344,7 @@ class Client:
|
|||
self,
|
||||
sym: str | None = None,
|
||||
|
||||
venue: MarketType | None = None,
|
||||
venue: MarketType = 'option',
|
||||
expiry: str | None = None,
|
||||
|
||||
) -> dict[str, Pair] | Pair:
|
||||
|
@ -381,7 +358,7 @@ class Client:
|
|||
return cached_pair
|
||||
|
||||
if sym:
|
||||
return pair_table[sym.lower()]
|
||||
return pair_table[sym]
|
||||
else:
|
||||
return self._pairs
|
||||
|
||||
|
@ -407,7 +384,7 @@ class Client:
|
|||
'expired': str(expired).lower()
|
||||
}
|
||||
|
||||
resp: JSONRPCResult = await self.json_rpc(
|
||||
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
|
||||
'public/get_instruments',
|
||||
params,
|
||||
)
|
||||
|
@ -440,10 +417,32 @@ class Client:
|
|||
|
||||
async def cache_symbols(
|
||||
self,
|
||||
) -> dict:
|
||||
venue: MarketType = 'option',
|
||||
|
||||
if not self._pairs:
|
||||
self._pairs = await self.symbol_info()
|
||||
) -> None:
|
||||
# lookup internal mkt-specific pair table to update
|
||||
pair_table: dict[str, Pair] = self._pairs
|
||||
|
||||
# make API request(s)
|
||||
mkt_pairs = await self.symbol_info()
|
||||
|
||||
if not mkt_pairs:
|
||||
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
|
||||
|
||||
pairs_view_subtable: dict[str, Pair] = {}
|
||||
|
||||
for instrument in mkt_pairs:
|
||||
pair_type: Type = PAIRTYPES[venue]
|
||||
|
||||
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
|
||||
|
||||
pair_table[pair.symbol.upper()] = pair
|
||||
|
||||
# update an additional top-level-cross-venue-table
|
||||
# `._pairs: ChainMap` for search B0
|
||||
pairs_view_subtable[pair.bs_fqme] = pair
|
||||
|
||||
self._pairs.maps.append(pairs_view_subtable)
|
||||
|
||||
return self._pairs
|
||||
|
||||
|
@ -456,20 +455,15 @@ class Client:
|
|||
Fuzzy search symbology set for pairs matching `pattern`.
|
||||
|
||||
'''
|
||||
pairs: dict[str, Pair] = await self.symbol_info()
|
||||
matches: dict[str, Pair] = match_from_pairs(
|
||||
pairs: dict[str, Pair] = await self.exch_info()
|
||||
|
||||
return match_from_pairs(
|
||||
pairs=pairs,
|
||||
query=pattern.upper(),
|
||||
score_cutoff=35,
|
||||
limit=limit
|
||||
)
|
||||
|
||||
# repack in name-keyed table
|
||||
return {
|
||||
pair['instrument_name'].lower(): pair
|
||||
for pair in matches.values()
|
||||
}
|
||||
|
||||
async def bars(
|
||||
self,
|
||||
mkt: MktPair,
|
||||
|
@ -481,7 +475,7 @@ class Client:
|
|||
as_np: bool = True,
|
||||
|
||||
) -> list[tuple] | np.ndarray:
|
||||
instrument: str = mkt.bs_fqme
|
||||
instrument: str = mkt.bs_fqme.split('.')[0]
|
||||
|
||||
if end_dt is None:
|
||||
end_dt = now('UTC')
|
||||
|
@ -494,7 +488,7 @@ class Client:
|
|||
end_time = deribit_timestamp(end_dt)
|
||||
|
||||
# https://docs.deribit.com/#public-get_tradingview_chart_data
|
||||
resp = await self.json_rpc(
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
'public/get_tradingview_chart_data',
|
||||
params={
|
||||
'instrument_name': instrument.upper(),
|
||||
|
@ -531,7 +525,7 @@ class Client:
|
|||
instrument: str,
|
||||
count: int = 10
|
||||
):
|
||||
resp = await self.json_rpc(
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
'public/get_last_trades_by_instrument',
|
||||
params={
|
||||
'instrument_name': instrument,
|
||||
|
@ -543,7 +537,8 @@ class Client:
|
|||
|
||||
@acm
|
||||
async def get_client(
|
||||
is_brokercheck: bool = False
|
||||
is_brokercheck: bool = False,
|
||||
venue: MarketType = 'option',
|
||||
) -> Client:
|
||||
|
||||
async with (
|
||||
|
@ -553,68 +548,6 @@ async def get_client(
|
|||
) as json_rpc
|
||||
):
|
||||
client = Client(json_rpc)
|
||||
_refresh_token: Optional[str] = None
|
||||
_access_token: Optional[str] = None
|
||||
|
||||
async def _auth_loop(
|
||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
"""Background task that adquires a first access token and then will
|
||||
refresh the access token while the nursery isn't cancelled.
|
||||
|
||||
https://docs.deribit.com/?python#authentication-2
|
||||
"""
|
||||
renew_time = 10
|
||||
access_scope = 'trade:read_write'
|
||||
_expiry_time = time.time()
|
||||
got_access = False
|
||||
nonlocal _refresh_token
|
||||
nonlocal _access_token
|
||||
|
||||
while True:
|
||||
if time.time() - _expiry_time < renew_time:
|
||||
# if we are close to token expiry time
|
||||
|
||||
if _refresh_token != None:
|
||||
# if we have a refresh token already dont need to send
|
||||
# secret
|
||||
params = {
|
||||
'grant_type': 'refresh_token',
|
||||
'refresh_token': _refresh_token,
|
||||
'scope': access_scope
|
||||
}
|
||||
|
||||
else:
|
||||
# we don't have refresh token, send secret to initialize
|
||||
params = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': client._key_id,
|
||||
'client_secret': client._key_secret,
|
||||
'scope': access_scope
|
||||
}
|
||||
|
||||
resp = await json_rpc('public/auth', params)
|
||||
result = resp.result
|
||||
|
||||
_expiry_time = time.time() + result['expires_in']
|
||||
_refresh_token = result['refresh_token']
|
||||
|
||||
if 'access_token' in result:
|
||||
_access_token = result['access_token']
|
||||
|
||||
if not got_access:
|
||||
# first time this loop runs we must indicate task is
|
||||
# started, we have auth
|
||||
got_access = True
|
||||
task_status.started()
|
||||
|
||||
else:
|
||||
await trio.sleep(renew_time / 2)
|
||||
|
||||
# if we have client creds launch auth loop
|
||||
if client._key_id is not None:
|
||||
await n.start(_auth_loop)
|
||||
|
||||
await client.cache_symbols()
|
||||
yield client
|
||||
n.cancel_scope.cancel()
|
||||
|
@ -622,7 +555,7 @@ async def get_client(
|
|||
|
||||
@acm
|
||||
async def open_feed_handler():
|
||||
fh = FeedHandler(config=get_fh_config())
|
||||
fh = FeedHandler(config=get_config())
|
||||
yield fh
|
||||
await to_asyncio.run_task(fh.stop_async)
|
||||
|
||||
|
|
|
@ -160,38 +160,26 @@ async def get_mkt_info(
|
|||
assets: dict[str, Asset] = await client.get_assets()
|
||||
pair_str: str = mkt_ep.lower()
|
||||
|
||||
# switch venue-mode depending on input pattern parsing
|
||||
# since we want to use a particular endpoint (set) for
|
||||
# pair info lookup!
|
||||
client.mkt_mode = mkt_mode
|
||||
|
||||
pair: Pair = await client.exch_info(
|
||||
sym=pair_str,
|
||||
)
|
||||
dst: Asset | None = assets.get(pair.bs_dst_asset)
|
||||
if (
|
||||
not dst
|
||||
# TODO: a known asset DNE list?
|
||||
# and pair.baseAsset == 'DEFI'
|
||||
):
|
||||
log.warning(
|
||||
f'UNKNOWN {venue} asset {pair.base_currency} from,\n'
|
||||
f'{pformat(pair.to_dict())}'
|
||||
)
|
||||
mkt_mode = pair.venue
|
||||
client.mkt_mode = mkt_mode
|
||||
|
||||
# XXX UNKNOWN missing "asset", though no idea why?
|
||||
# maybe it's only avail in the margin venue(s): /dapi/ ?
|
||||
return None
|
||||
dst: Asset | None = assets.get(pair.bs_dst_asset)
|
||||
src: Asset | None = assets.get(pair.bs_src_asset)
|
||||
|
||||
mkt = MktPair(
|
||||
dst=dst,
|
||||
src=assets.get(pair.bs_src_asset),
|
||||
src=src,
|
||||
price_tick=pair.price_tick,
|
||||
size_tick=pair.size_tick,
|
||||
bs_mktid=pair.symbol,
|
||||
expiry=expiry,
|
||||
venue=venue,
|
||||
expiry=pair.expiry,
|
||||
venue=mkt_mode,
|
||||
broker='deribit',
|
||||
_atype=mkt_mode,
|
||||
_fqme_without_src=True,
|
||||
)
|
||||
return mkt, pair
|
||||
|
||||
|
@ -210,7 +198,7 @@ async def stream_quotes(
|
|||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
sym = symbols[0]
|
||||
sym = symbols[0].split('.')[0]
|
||||
|
||||
init_msgs: list[FeedInit] = []
|
||||
|
||||
|
@ -225,11 +213,11 @@ async def stream_quotes(
|
|||
init_msgs.append(
|
||||
FeedInit(mkt_info=mkt)
|
||||
)
|
||||
nsym = piker_sym_to_cb_sym(sym.split('.')[0])
|
||||
nsym = piker_sym_to_cb_sym(sym)
|
||||
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
|
||||
cache = await client.cache_symbols()
|
||||
cache = client._pairs
|
||||
|
||||
last_trades = (await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||
|
@ -271,7 +259,7 @@ async def open_symbol_search(
|
|||
async with open_cached_client('deribit') as client:
|
||||
|
||||
# load all symbols locally for fast search
|
||||
cache = await client.cache_symbols()
|
||||
cache = client._pairs
|
||||
await ctx.started()
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
|
|
@ -19,6 +19,7 @@ Per market data-type definitions and schemas types.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import pendulum
|
||||
from typing import (
|
||||
Literal,
|
||||
)
|
||||
|
@ -66,29 +67,27 @@ class Pair(Struct, frozen=True, kw_only=True):
|
|||
# dst
|
||||
base_currency: str # "BTC",
|
||||
|
||||
tick_size: float # 0.0001
|
||||
tick_size_steps: list[dict[str, str | int | float]] # [{'above_price': 0.005, 'tick_size': 0.0005}]
|
||||
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
|
||||
tick_size_steps: list[dict[str, float]]
|
||||
|
||||
@property
|
||||
def price_tick(self) -> Decimal:
|
||||
step_size: float = self.tick_size_steps[0].get('above_price')
|
||||
return Decimal(step_size)
|
||||
return Decimal(str(self.tick_size_steps[0]['above_price']))
|
||||
|
||||
@property
|
||||
def size_tick(self) -> Decimal:
|
||||
step_size: float = self.tick_size_steps[0].get('tick_size')
|
||||
return Decimal(step_size)
|
||||
return Decimal(str(self.tick_size))
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
return self.symbol
|
||||
return f'{self.symbol}'
|
||||
|
||||
@property
|
||||
def bs_mktid(self) -> str:
|
||||
return f'{self.symbol}.{self.venue}'
|
||||
|
||||
|
||||
class OptionPair(Pair, frozen=True, kw_only=True):
|
||||
class OptionPair(Pair, frozen=True):
|
||||
|
||||
taker_commission: float # 0.0003
|
||||
strike: float # 5000.0
|
||||
|
@ -116,13 +115,18 @@ class OptionPair(Pair, frozen=True, kw_only=True):
|
|||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
||||
|
||||
@property
|
||||
def expiry(self) -> str:
|
||||
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
|
||||
return iso_date
|
||||
|
||||
@property
|
||||
def venue(self) -> str:
|
||||
return 'OPTION'
|
||||
return 'option'
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
return f'{self.symbol}.OPTION'
|
||||
return f'{self.symbol}'
|
||||
|
||||
@property
|
||||
def bs_src_asset(self) -> str:
|
||||
|
@ -130,13 +134,58 @@ class OptionPair(Pair, frozen=True, kw_only=True):
|
|||
|
||||
@property
|
||||
def bs_dst_asset(self) -> str:
|
||||
return f'{self.base_currency}'
|
||||
|
||||
@property
|
||||
def bs_mktid(self) -> str:
|
||||
return f'{self.symbol}.{self.venue}'
|
||||
return f'{self.symbol}'
|
||||
|
||||
|
||||
PAIRTYPES: dict[MarketType, Pair] = {
|
||||
'option': OptionPair,
|
||||
}
|
||||
|
||||
|
||||
class JSONRPCResult(Struct):
|
||||
id: int
|
||||
usIn: int
|
||||
usOut: int
|
||||
usDiff: int
|
||||
testnet: bool
|
||||
jsonrpc: str = '2.0'
|
||||
error: Optional[dict] = None
|
||||
result: Optional[list[dict]] = None
|
||||
|
||||
class JSONRPCChannel(Struct):
|
||||
method: str
|
||||
params: dict
|
||||
jsonrpc: str = '2.0'
|
||||
|
||||
|
||||
class KLinesResult(Struct):
|
||||
low: list[float]
|
||||
cost: list[float]
|
||||
high: list[float]
|
||||
open: list[float]
|
||||
close: list[float]
|
||||
ticks: list[int]
|
||||
status: str
|
||||
volume: list[float]
|
||||
|
||||
class Trade(Struct):
|
||||
iv: float
|
||||
price: float
|
||||
amount: float
|
||||
trade_id: str
|
||||
contracts: float
|
||||
direction: str
|
||||
trade_seq: int
|
||||
timestamp: int
|
||||
mark_price: float
|
||||
index_price: float
|
||||
tick_direction: int
|
||||
instrument_name: str
|
||||
combo_id: Optional[str] = '',
|
||||
combo_trade_id: Optional[int] = 0,
|
||||
block_trade_id: Optional[str] = '',
|
||||
block_trade_leg_count: Optional[int] = 0,
|
||||
|
||||
class LastTradesResult(Struct):
|
||||
trades: list[Trade]
|
||||
has_more: bool
|
||||
|
|
|
@ -273,7 +273,7 @@ async def _reconnect_forever(
|
|||
nobsws._connected.set()
|
||||
await trio.sleep_forever()
|
||||
except HandshakeError:
|
||||
log.exception(f'Retrying connection')
|
||||
log.exception('Retrying connection')
|
||||
|
||||
# ws & nursery block ends
|
||||
|
||||
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
|||
|
||||
|
||||
'''
|
||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
||||
over a NoBsWs.
|
||||
JSONRPC response-request style machinery for transparent multiplexing
|
||||
of msgs over a NoBsWs.
|
||||
|
||||
'''
|
||||
|
||||
|
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
|
|||
url: str,
|
||||
start_id: int = 0,
|
||||
response_type: type = JSONRPCResult,
|
||||
request_type: Optional[type] = None,
|
||||
request_hook: Optional[Callable] = None,
|
||||
error_hook: Optional[Callable] = None,
|
||||
# request_type: Optional[type] = None,
|
||||
# request_hook: Optional[Callable] = None,
|
||||
# error_hook: Optional[Callable] = None,
|
||||
) -> Callable[[str, dict], dict]:
|
||||
|
||||
# NOTE, store all request msgs so we can raise errors on the
|
||||
# caller side!
|
||||
req_msgs: dict[int, dict] = {}
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_autorecon_ws(url) as ws
|
||||
):
|
||||
rpc_id: Iterable = count(start_id)
|
||||
rpc_id: Iterable[int] = count(start_id)
|
||||
rpc_results: dict[int, dict] = {}
|
||||
|
||||
async def json_rpc(method: str, params: dict) -> dict:
|
||||
|
@ -394,26 +398,40 @@ async def open_jsonrpc_session(
|
|||
perform a json rpc call and wait for the result, raise exception in
|
||||
case of error field present on response
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
|
||||
req_id: int = next(rpc_id)
|
||||
msg = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': next(rpc_id),
|
||||
'id': req_id,
|
||||
'method': method,
|
||||
'params': params
|
||||
}
|
||||
_id = msg['id']
|
||||
|
||||
rpc_results[_id] = {
|
||||
result = rpc_results[_id] = {
|
||||
'result': None,
|
||||
'event': trio.Event()
|
||||
'error': None,
|
||||
'event': trio.Event(), # signal caller resp arrived
|
||||
}
|
||||
req_msgs[_id] = msg
|
||||
|
||||
await ws.send_msg(msg)
|
||||
|
||||
# wait for reponse before unblocking requester code
|
||||
await rpc_results[_id]['event'].wait()
|
||||
|
||||
ret = rpc_results[_id]['result']
|
||||
if (maybe_result := result['result']):
|
||||
ret = maybe_result
|
||||
del rpc_results[_id]
|
||||
|
||||
del rpc_results[_id]
|
||||
else:
|
||||
err = result['error']
|
||||
raise Exception(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {msg}\n'
|
||||
f'resp: {err}\n'
|
||||
)
|
||||
|
||||
if ret.error is not None:
|
||||
raise Exception(json.dumps(ret.error, indent=4))
|
||||
|
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
|
|||
the server side.
|
||||
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
async for msg in ws:
|
||||
match msg:
|
||||
case {
|
||||
|
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
|
|||
'params': _,
|
||||
}:
|
||||
log.debug(f'Recieved\n{msg}')
|
||||
if request_hook:
|
||||
await request_hook(request_type(**msg))
|
||||
# if request_hook:
|
||||
# await request_hook(request_type(**msg))
|
||||
|
||||
case {
|
||||
'error': error
|
||||
}:
|
||||
log.warning(f'Recieved\n{error}')
|
||||
if error_hook:
|
||||
await error_hook(response_type(**msg))
|
||||
# if error_hook:
|
||||
# await error_hook(response_type(**msg))
|
||||
|
||||
# retreive orig request msg, set error
|
||||
# response in original "result" msg,
|
||||
# THEN FINALLY set the event to signal caller
|
||||
# to raise the error in the parent task.
|
||||
req_id: int = msg['id']
|
||||
req_msg: dict = req_msgs[req_id]
|
||||
result: dict = rpc_results[req_id]
|
||||
result['error'] = error
|
||||
result['event'].set()
|
||||
log.error(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {req_msg}\n'
|
||||
f'resp: {error}\n'
|
||||
)
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||
|
|
Loading…
Reference in New Issue