Deribit broker fix #8

Open
ntorres wants to merge 24 commits from deribit_fix into nix-qt6-fix
8 changed files with 1517 additions and 144 deletions

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -25,6 +25,7 @@ from .api import (
get_client,
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
@ -34,15 +35,20 @@ from .feed import (
# open_trade_dialog,
# norm_trade_records,
# )
from .venues import (
OptionPair,
)
log = get_logger(__name__)
__all__ = [
'get_client',
# 'trades_dialogue',
'get_mkt_info',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'OptionPair',
# 'norm_trade_records',
]

View File

@ -19,10 +19,14 @@ Deribit backend.
'''
import asyncio
from collections import ChainMap
from contextlib import (
asynccontextmanager as acm,
)
from datetime import datetime
from decimal import (
Decimal,
)
from functools import partial
import time
from typing import (
@ -31,7 +35,7 @@ from typing import (
Callable,
)
import pendulum
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
@ -51,7 +55,18 @@ from cryptofeed.defines import (
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from .venues import (
MarketType,
PAIRTYPES,
Pair,
OptionPair,
)
from piker.accounting import (
Asset,
MktPair,
)
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
@ -80,19 +95,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
result: Optional[list[dict]] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
@ -116,9 +131,12 @@ class Trade(Struct):
instrument_name: str
index_price: float
direction: str
contracts: float
amount: float
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float
block_trade_leg_count: Optional[int] = 0,
block_trade_id: Optional[str] = '',
class LastTradesResult(Struct):
trades: list[Trade]
@ -142,13 +160,15 @@ def str_to_cb_sym(name: str) -> Symbol:
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
expiry_date=new_expiry_date)
def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -159,76 +179,110 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
day, month, year = (
expiry_date[3:],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[:2]
)
return f'{day}{month}{year}'
def get_config() -> dict[str, Any]:
conf, path = config.load()
conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
section['log']['disabled'] = True
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
return conf
return section
def get_fh_config() -> dict[str, Any]:
conf_option = get_config().get('option', {})
conf_log = get_config().get('log', {})
return {
'log': {
'filename': conf_log.get('filename'),
'level': conf_log.get('level'),
'disabled': conf_log.get('disabled')
},
'deribit': {
'key_id': conf_option.get('api_key'),
'key_secret': conf_option.get('api_secret')
}
}
class Client:
def __init__(self, json_rpc: Callable) -> None:
self._pairs: dict[str, Any] = None
def __init__(
self,
config = get_config().get('deribit', {})
json_rpc: Callable
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
) -> None:
self._pairs: ChainMap[str, Pair] = ChainMap()
else:
self._key_id = None
self._key_secret = None
config = get_config().get('option', {})
self._key_id = config.get('api_key')
self._key_secret = config.get('api_secret')
self.json_rpc = json_rpc
@property
def currencies(self):
return ['btc', 'eth', 'sol', 'usd']
async def get_balances(self, kind: str = 'option') -> dict[str, float]:
async def get_balances(
self,
kind: str = 'option'
) -> dict[str, float]:
"""Return the set of positions for this account
by symbol.
"""
@ -244,20 +298,39 @@ class Client:
return balances
async def get_assets(self) -> dict[str, float]:
async def get_assets(
self,
venue: str | None = None,
) -> dict[str, Asset]:
"""Return the set of asset balances for this account
by symbol.
"""
balances = {}
assets = {}
resp = await self.json_rpc(
'private/get_account_summaries',
params={
'extended' : True
}
)
summaries = resp.result['summaries']
for summary in summaries:
currency = summary['currency']
tx_tick = Decimal('1e-08')
atype='crypto_currency'
assets[currency] = Asset(
name=currency,
atype=atype,
tx_tick=tx_tick)
return assets
for currency in self.currencies:
resp = await self.json_rpc(
'private/get_account_summary', params={
'currency': currency.upper()})
async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {}
for key in self._pairs:
item = self._pairs.get(key)
flat[item.bs_fqme] = item
balances[currency] = resp.result['balance']
return balances
return flat
async def submit_limit(
self,
@ -286,6 +359,28 @@ class Client:
'private/cancel', {'order_id': oid})
return resp.result
async def exch_info(
self,
sym: str | None = None,
venue: MarketType | None = None,
expiry: str | None = None,
) -> dict[str, Pair] | Pair:
pair_table: dict[str, Pair] = self._pairs
if (
sym
and (cached_pair := pair_table.get(sym))
):
return cached_pair
if sym:
return pair_table[sym.lower()]
else:
return self._pairs
async def symbol_info(
self,
instrument: Optional[str] = None,
@ -293,7 +388,7 @@ class Client:
kind: str = 'option',
expired: bool = False
) -> dict[str, dict]:
) -> dict[str, Pair] | Pair:
'''
Get symbol infos.
@ -313,14 +408,29 @@ class Client:
params,
)
# convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind]
results: list[dict] | None = resp.result
instruments: dict[str, dict] = {
item['instrument_name'].lower(): item
for item in results
}
instruments: dict[str, Pair] = {}
for item in results:
symbol=item['instrument_name'].lower()
try:
pair: Pair = pair_type(
symbol=symbol,
**item
)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid deribit changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://docs.deribit.com/?python#deribit-api-v2-1-1'
)
raise
instruments[symbol] = pair
if instrument is not None:
return instruments[instrument]
return instruments[instrument.lower()]
else:
return instruments
@ -337,12 +447,12 @@ class Client:
self,
pattern: str,
limit: int = 30,
) -> dict[str, Any]:
) -> dict[str, Pair]:
'''
Fuzzy search symbology set for pairs matching `pattern`.
'''
pairs: dict[str, Any] = await self.symbol_info()
pairs: dict[str, Pair] = await self.symbol_info()
matches: dict[str, Pair] = match_from_pairs(
pairs=pairs,
query=pattern.upper(),
@ -352,22 +462,25 @@ class Client:
# repack in name-keyed table
return {
pair['instrument_name'].lower(): pair
pair.instrument_name.lower(): pair
for pair in matches.values()
}
async def bars(
self,
symbol: str,
mkt: MktPair,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> dict:
instrument = symbol
) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_mktid
if end_dt is None:
end_dt = pendulum.now('UTC')
end_dt = now('UTC')
if start_dt is None:
start_dt = end_dt.start_of(
@ -387,29 +500,27 @@ class Client:
})
result = KLinesResult(**resp.result)
new_bars = []
new_bars: list[tuple] = []
for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i],
0
result.volume[i]
]
new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines
return array
if not as_np:
return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades(
self,
@ -434,10 +545,10 @@ async def get_client(
async with (
trio.open_nursery() as n,
open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
_ws_url, response_type=JSONRPCResult
) as json_rpc
):
client = Client(json_rpc)
_refresh_token: Optional[str] = None
_access_token: Optional[str] = None
@ -507,7 +618,7 @@ async def get_client(
@acm
async def open_feed_handler():
fh = FeedHandler(config=get_config())
fh = FeedHandler(config=get_fh_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@ -523,7 +634,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
instrument: str,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
@ -542,21 +653,33 @@ async def aio_price_feed_relay(
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
}))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)],
symbols=[sym],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
@ -597,9 +720,9 @@ async def maybe_open_price_feed(
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
'instrument': instrument
'instrument': instrument.split('.')[0]
},
key=f'{instrument}-price',
key=f'{instrument.split('.')[0]}-price',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
@ -664,10 +787,10 @@ async def maybe_open_order_feed(
async with maybe_open_context(
acm_func=open_order_feed,
kwargs={
'instrument': instrument,
'instrument': instrument.split('.')[0],
'fh': fh
},
key=f'{instrument}-order',
key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)

View File

@ -21,18 +21,33 @@ Deribit backend.
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, Callable
from pprint import pformat
import time
import trio
from trio_typing import TaskStatus
import pendulum
from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.brokers import open_cached_client
from piker.accounting import (
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
@ -47,9 +62,13 @@ from cryptofeed.symbols import Symbol
from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -64,36 +83,119 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars(
instrument,
array: np.ndarray = await client.bars(
mkt,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
raise DataUnavailable
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
# switch venue-mode depending on input pattern parsing
# since we want to use a particular endpoint (set) for
# pair info lookup!
client.mkt_mode = mkt_mode
pair: Pair = await client.exch_info(
sym=pair_str,
)
dst: Asset | None = assets.get(pair.bs_dst_asset)
if (
not dst
# TODO: a known asset DNE list?
# and pair.baseAsset == 'DEFI'
):
log.warning(
f'UNKNOWN {venue} asset {pair.base_currency} from,\n'
f'{pformat(pair.to_dict())}'
)
# XXX UNKNOWN missing "asset", though no idea why?
# maybe it's only avail in the margin venue(s): /dapi/ ?
return None
mkt = MktPair(
dst=dst,
src=assets.get(pair.bs_src_asset),
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=expiry,
venue=venue,
broker='deribit',
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
@ -110,25 +212,20 @@ async def stream_quotes(
sym = symbols[0]
init_msgs: list[FeedInit] = []
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
mkt, pair = await get_mkt_info(sym)
nsym = piker_sym_to_cb_sym(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
)
nsym = piker_sym_to_cb_sym(sym.split('.')[0])
async with maybe_open_price_feed(sym) as stream:
@ -179,7 +276,16 @@ async def open_symbol_search(
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# repack in dict form
await stream.send(
await client.search_symbols(pattern))
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)

View File

@ -0,0 +1,148 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
from typing import (
Literal,
)
from decimal import Decimal
from msgspec import field
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001
tick_size_steps: list[dict[str, str | int | float]] # [{'above_price': 0.005, 'tick_size': 0.0005}]
@property
def price_tick(self) -> Decimal:
step_size: float = self.tick_size_steps[0].get('above_price')
return Decimal(step_size)
@property
def size_tick(self) -> Decimal:
step_size: float = self.tick_size_steps[0].get('tick_size')
return Decimal(step_size)
@property
def bs_fqme(self) -> str:
return self.symbol
@property
def bs_mktid(self) -> str:
return f'{self.symbol}'
class OptionPair(Pair, frozen=True, kw_only=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
@property
def expiry(self) -> str:
symbol: str = self.instrument_name.lower()
pair, expiry, strike_price, otype = symbol.split('-')
return f'{expiry}'
@property
def venue(self) -> str:
return 'OPTION'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}.{self.venue}.{self.expiry}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.base_currency}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception(f'Retrying connection')
log.exception('Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs.
'''
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
# request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable = count(start_id)
rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
@ -394,26 +398,40 @@ async def open_jsonrpc_session(
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': next(rpc_id),
'id': req_id,
'method': method,
'params': params
}
_id = msg['id']
rpc_results[_id] = {
result = rpc_results[_id] = {
'result': None,
'event': trio.Event()
'error': None,
'event': trio.Event(), # signal caller resp arrived
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result']
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
# if request_hook:
# await request_hook(request_type(**msg))
case {
'error': error
}:
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
# if error_hook:
# await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = msg['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')

957
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -69,6 +69,8 @@ pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor]
develop = true