Compare commits
5 Commits
9232d09440
...
28e8628c61
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 28e8628c61 | |
Tyler Goodlet | b734245183 | |
Tyler Goodlet | dc2c379d86 | |
Tyler Goodlet | be84d0dae1 | |
Tyler Goodlet | bdc3bc9219 |
|
@ -28,6 +28,8 @@ from decimal import (
|
|||
Decimal,
|
||||
)
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
|
@ -37,8 +39,6 @@ from typing import (
|
|||
|
||||
from pendulum import now
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from rapidfuzz import process as fuzzy
|
||||
import numpy as np
|
||||
from tractor.trionics import (
|
||||
broadcast_receiver,
|
||||
|
@ -55,8 +55,10 @@ from cryptofeed.defines import (
|
|||
OPTION, CALL, PUT
|
||||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
# types for managing the cb callbacks.
|
||||
# from cryptofeed.types import L1Book
|
||||
from piker.brokers import SymbolNotFound
|
||||
from .venues import (
|
||||
_ws_url,
|
||||
MarketType,
|
||||
|
@ -64,9 +66,9 @@ from .venues import (
|
|||
Pair,
|
||||
OptionPair,
|
||||
JSONRPCResult,
|
||||
JSONRPCChannel,
|
||||
# JSONRPCChannel,
|
||||
KLinesResult,
|
||||
Trade,
|
||||
# Trade,
|
||||
LastTradesResult,
|
||||
)
|
||||
from piker.accounting import (
|
||||
|
@ -77,7 +79,7 @@ from piker.accounting import (
|
|||
from piker.data import (
|
||||
def_iohlcv_fields,
|
||||
match_from_pairs,
|
||||
Struct,
|
||||
# Struct,
|
||||
)
|
||||
from piker.data._web_bs import (
|
||||
open_jsonrpc_session
|
||||
|
@ -97,7 +99,7 @@ _spawn_kwargs = {
|
|||
|
||||
|
||||
# convert datetime obj timestamp to unixtime in milliseconds
|
||||
def deribit_timestamp(when):
|
||||
def deribit_timestamp(when) -> int:
|
||||
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
||||
|
||||
|
||||
|
@ -179,16 +181,18 @@ def get_config() -> dict[str, Any]:
|
|||
|
||||
conf: dict
|
||||
path: Path
|
||||
|
||||
conf, path = config.load(
|
||||
conf_name='brokers',
|
||||
touch_if_dne=True,
|
||||
)
|
||||
section: dict = {}
|
||||
section = conf.get('deribit')
|
||||
section: dict|None = conf.get('deribit')
|
||||
if section is None:
|
||||
log.warning(f'No config section found for deribit in {path}')
|
||||
return {}
|
||||
raise ValueError(
|
||||
f'No `[deribit]` section found in\n'
|
||||
f'{path!r}\n\n'
|
||||
f'See the template config from the core repo for samples..\n'
|
||||
# f'<TODO put repo link here??>'
|
||||
)
|
||||
|
||||
conf_option = section.get('option', {})
|
||||
section.clear # clear the dict to reuse it
|
||||
|
@ -223,8 +227,12 @@ class Client:
|
|||
self._auth_ts = None
|
||||
self._auth_renew_ts = 5 # seconds to renew auth
|
||||
|
||||
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
|
||||
|
||||
async def _json_rpc_auth_wrapper(
|
||||
self,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> JSONRPCResult:
|
||||
|
||||
"""Background task that adquires a first access token and then will
|
||||
refresh the access token.
|
||||
|
||||
|
@ -250,9 +258,6 @@ class Client:
|
|||
|
||||
return await self.json_rpc(*args, **kwargs)
|
||||
|
||||
|
||||
|
||||
|
||||
async def get_balances(
|
||||
self,
|
||||
kind: str = 'option'
|
||||
|
@ -277,23 +282,29 @@ class Client:
|
|||
venue: str | None = None,
|
||||
|
||||
) -> dict[str, Asset]:
|
||||
"""Return the set of asset balances for this account
|
||||
by symbol.
|
||||
"""
|
||||
'''
|
||||
Return the set of asset balances for this account
|
||||
by (deribit's) symbol.
|
||||
|
||||
|
||||
'''
|
||||
assets = {}
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
'public/get_currencies',
|
||||
params={}
|
||||
)
|
||||
currencies = resp.result
|
||||
currencies: list[dict] = resp.result
|
||||
for currency in currencies:
|
||||
name = currency['currency']
|
||||
tx_tick = digits_to_dec(currency['fee_precision'])
|
||||
atype='crypto_currency'
|
||||
name: str = currency['currency']
|
||||
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
|
||||
|
||||
# TODO, handling of options, futures, perps etc. more
|
||||
# specifically with diff `.atype`s?
|
||||
assets[name] = Asset(
|
||||
name=name,
|
||||
atype=atype,
|
||||
tx_tick=tx_tick)
|
||||
atype='crypto_currency',
|
||||
tx_tick=tx_tick,
|
||||
)
|
||||
|
||||
instruments = await self.symbol_info(currency=name)
|
||||
for instrument in instruments:
|
||||
|
@ -301,9 +312,10 @@ class Client:
|
|||
assets[pair.symbol] = Asset(
|
||||
name=pair.symbol,
|
||||
atype=pair.venue,
|
||||
tx_tick=pair.size_tick)
|
||||
tx_tick=pair.size_tick,
|
||||
)
|
||||
|
||||
return assets
|
||||
return assets
|
||||
|
||||
async def get_mkt_pairs(self) -> dict[str, Pair]:
|
||||
flat: dict[str, Pair] = {}
|
||||
|
@ -358,6 +370,19 @@ class Client:
|
|||
return cached_pair
|
||||
|
||||
if sym:
|
||||
opt: OptionPair|None = pair_table.get(sym)
|
||||
if not opt:
|
||||
closest_matches: dict[str, Pair] = match_from_pairs(
|
||||
pairs=pair_table,
|
||||
query=sym,
|
||||
score_cutoff=40,
|
||||
)
|
||||
closest_syms: list[str] = list(closest_matches.keys())
|
||||
raise ValueError(
|
||||
f'No contract found for {sym!r}\n\n'
|
||||
f'Closest {len(closest_syms)} available contracts:\n\n'
|
||||
f'{pformat(closest_syms)}\n'
|
||||
)
|
||||
return pair_table[sym]
|
||||
else:
|
||||
return self._pairs
|
||||
|
@ -381,7 +406,7 @@ class Client:
|
|||
params: dict[str, str] = {
|
||||
'currency': currency.upper(),
|
||||
'kind': kind,
|
||||
'expired': str(expired).lower()
|
||||
'expired': expired,
|
||||
}
|
||||
|
||||
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
|
||||
|
@ -389,9 +414,9 @@ class Client:
|
|||
params,
|
||||
)
|
||||
# convert to symbol-keyed table
|
||||
pair_type: Type = PAIRTYPES[kind]
|
||||
pair_type: Pair = PAIRTYPES[kind]
|
||||
results: list[dict] | None = resp.result
|
||||
|
||||
|
||||
instruments: dict[str, Pair] = {}
|
||||
for item in results:
|
||||
symbol=item['instrument_name'].lower()
|
||||
|
@ -427,12 +452,15 @@ class Client:
|
|||
mkt_pairs = await self.symbol_info()
|
||||
|
||||
if not mkt_pairs:
|
||||
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
|
||||
raise SymbolNotFound(
|
||||
f'No market pairs found!?:\n'
|
||||
f'{mkt_pairs}'
|
||||
)
|
||||
|
||||
pairs_view_subtable: dict[str, Pair] = {}
|
||||
|
||||
for instrument in mkt_pairs:
|
||||
pair_type: Type = PAIRTYPES[venue]
|
||||
pair_type: Pair|OptionPair = PAIRTYPES[venue]
|
||||
|
||||
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
|
||||
|
||||
|
@ -480,12 +508,14 @@ class Client:
|
|||
if end_dt is None:
|
||||
end_dt = now('UTC')
|
||||
|
||||
_orig_start_dt = start_dt
|
||||
if start_dt is None:
|
||||
start_dt = end_dt.start_of(
|
||||
'minute').subtract(minutes=limit)
|
||||
'minute'
|
||||
).subtract(minutes=limit)
|
||||
|
||||
start_time = deribit_timestamp(start_dt)
|
||||
end_time = deribit_timestamp(end_dt)
|
||||
start_time: int = deribit_timestamp(start_dt)
|
||||
end_time: int = deribit_timestamp(end_dt)
|
||||
|
||||
# https://docs.deribit.com/#public-get_tradingview_chart_data
|
||||
resp = await self._json_rpc_auth_wrapper(
|
||||
|
@ -499,9 +529,13 @@ class Client:
|
|||
|
||||
result = KLinesResult(**resp.result)
|
||||
new_bars: list[tuple] = []
|
||||
for i in range(len(result.close)):
|
||||
# if _orig_start_dt is None:
|
||||
# if not new_bars:
|
||||
# import tractor
|
||||
# await tractor.pause()
|
||||
|
||||
row = [
|
||||
for i in range(len(result.close)):
|
||||
row = [
|
||||
(start_time + (i * (60 * 1000))) / 1000.0, # time
|
||||
result.open[i],
|
||||
result.high[i],
|
||||
|
@ -668,68 +702,69 @@ async def maybe_open_price_feed(
|
|||
|
||||
|
||||
|
||||
async def aio_order_feed_relay(
|
||||
fh: FeedHandler,
|
||||
instrument: Symbol,
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _fill(data: dict, receipt_timestamp):
|
||||
breakpoint()
|
||||
# TODO, move all to `.broker` submod!
|
||||
# async def aio_order_feed_relay(
|
||||
# fh: FeedHandler,
|
||||
# instrument: Symbol,
|
||||
# from_trio: asyncio.Queue,
|
||||
# to_trio: trio.abc.SendChannel,
|
||||
# ) -> None:
|
||||
# async def _fill(data: dict, receipt_timestamp):
|
||||
# breakpoint()
|
||||
|
||||
async def _order_info(data: dict, receipt_timestamp):
|
||||
breakpoint()
|
||||
# async def _order_info(data: dict, receipt_timestamp):
|
||||
# breakpoint()
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[FILLS, ORDER_INFO],
|
||||
symbols=[instrument.upper()],
|
||||
callbacks={
|
||||
FILLS: _fill,
|
||||
ORDER_INFO: _order_info,
|
||||
})
|
||||
# fh.add_feed(
|
||||
# DERIBIT,
|
||||
# channels=[FILLS, ORDER_INFO],
|
||||
# symbols=[instrument.upper()],
|
||||
# callbacks={
|
||||
# FILLS: _fill,
|
||||
# ORDER_INFO: _order_info,
|
||||
# })
|
||||
|
||||
if not fh.running:
|
||||
fh.run(
|
||||
start_loop=False,
|
||||
install_signal_handlers=False)
|
||||
# if not fh.running:
|
||||
# fh.run(
|
||||
# start_loop=False,
|
||||
# install_signal_handlers=False)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
# # sync with trio
|
||||
# to_trio.send_nowait(None)
|
||||
|
||||
await asyncio.sleep(float('inf'))
|
||||
# await asyncio.sleep(float('inf'))
|
||||
|
||||
|
||||
@acm
|
||||
async def open_order_feed(
|
||||
instrument: list[str]
|
||||
) -> trio.abc.ReceiveStream:
|
||||
async with maybe_open_feed_handler() as fh:
|
||||
async with to_asyncio.open_channel_from(
|
||||
partial(
|
||||
aio_order_feed_relay,
|
||||
fh,
|
||||
instrument
|
||||
)
|
||||
) as (first, chan):
|
||||
yield chan
|
||||
# @acm
|
||||
# async def open_order_feed(
|
||||
# instrument: list[str]
|
||||
# ) -> trio.abc.ReceiveStream:
|
||||
# async with maybe_open_feed_handler() as fh:
|
||||
# async with to_asyncio.open_channel_from(
|
||||
# partial(
|
||||
# aio_order_feed_relay,
|
||||
# fh,
|
||||
# instrument
|
||||
# )
|
||||
# ) as (first, chan):
|
||||
# yield chan
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_order_feed(
|
||||
instrument: str
|
||||
) -> trio.abc.ReceiveStream:
|
||||
# @acm
|
||||
# async def maybe_open_order_feed(
|
||||
# instrument: str
|
||||
# ) -> trio.abc.ReceiveStream:
|
||||
|
||||
# TODO: add a predicate to maybe_open_context
|
||||
async with maybe_open_context(
|
||||
acm_func=open_order_feed,
|
||||
kwargs={
|
||||
'instrument': instrument.split('.')[0],
|
||||
'fh': fh
|
||||
},
|
||||
key=f'{instrument.split('.')[0]}-order',
|
||||
) as (cache_hit, feed):
|
||||
if cache_hit:
|
||||
yield broadcast_receiver(feed, 10)
|
||||
else:
|
||||
yield feed
|
||||
# # TODO: add a predicate to maybe_open_context
|
||||
# async with maybe_open_context(
|
||||
# acm_func=open_order_feed,
|
||||
# kwargs={
|
||||
# 'instrument': instrument.split('.')[0],
|
||||
# 'fh': fh
|
||||
# },
|
||||
# key=f'{instrument.split('.')[0]}-order',
|
||||
# ) as (cache_hit, feed):
|
||||
# if cache_hit:
|
||||
# yield broadcast_receiver(feed, 10)
|
||||
# else:
|
||||
# yield feed
|
||||
|
|
|
@ -18,56 +18,65 @@
|
|||
Deribit backend.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, Callable
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
# Any,
|
||||
# Optional,
|
||||
Callable,
|
||||
)
|
||||
# from pprint import pformat
|
||||
import time
|
||||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from pendulum import (
|
||||
from_timestamp,
|
||||
now,
|
||||
)
|
||||
from rapidfuzz import process as fuzzy
|
||||
import numpy as np
|
||||
import tractor
|
||||
|
||||
from piker.accounting import (
|
||||
Asset,
|
||||
MktPair,
|
||||
unpack_fqme,
|
||||
)
|
||||
from piker.brokers import (
|
||||
open_cached_client,
|
||||
NoData,
|
||||
DataUnavailable,
|
||||
)
|
||||
from piker._cacheables import (
|
||||
async_lifo_cache,
|
||||
)
|
||||
from piker.log import get_logger, get_console_log
|
||||
from piker.data import ShmArray
|
||||
from piker.log import (
|
||||
get_logger,
|
||||
)
|
||||
from piker.data.validate import FeedInit
|
||||
from piker.brokers._util import (
|
||||
BrokerError,
|
||||
DataUnavailable,
|
||||
)
|
||||
|
||||
from cryptofeed import FeedHandler
|
||||
from cryptofeed.defines import (
|
||||
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
|
||||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
# from cryptofeed import FeedHandler
|
||||
# from cryptofeed.defines import (
|
||||
# DERIBIT,
|
||||
# L1_BOOK,
|
||||
# TRADES,
|
||||
# OPTION,
|
||||
# CALL,
|
||||
# PUT,
|
||||
# )
|
||||
# from cryptofeed.symbols import Symbol
|
||||
|
||||
from .api import (
|
||||
Client, Trade,
|
||||
get_config,
|
||||
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||
Client,
|
||||
# get_config,
|
||||
piker_sym_to_cb_sym,
|
||||
cb_sym_to_deribit_inst,
|
||||
maybe_open_price_feed
|
||||
)
|
||||
from .venues import (
|
||||
Pair,
|
||||
OptionPair,
|
||||
Trade,
|
||||
)
|
||||
|
||||
_spawn_kwargs = {
|
||||
|
@ -86,6 +95,10 @@ async def open_history_client(
|
|||
# TODO implement history getter for the new storage layer.
|
||||
async with open_cached_client('deribit') as client:
|
||||
|
||||
pair: OptionPair = client._pairs[mkt.dst.name]
|
||||
# XXX NOTE, the cuckers use ms !!!
|
||||
creation_time_s: int = pair.creation_timestamp/1000
|
||||
|
||||
async def get_ohlc(
|
||||
timeframe: float,
|
||||
end_dt: datetime | None = None,
|
||||
|
@ -105,6 +118,31 @@ async def open_history_client(
|
|||
end_dt=end_dt,
|
||||
)
|
||||
if len(array) == 0:
|
||||
if (
|
||||
end_dt is None
|
||||
):
|
||||
raise DataUnavailable(
|
||||
'No history seems to exist yet?\n\n'
|
||||
f'{mkt}'
|
||||
)
|
||||
elif (
|
||||
end_dt
|
||||
and
|
||||
end_dt.timestamp() < creation_time_s
|
||||
):
|
||||
# the contract can't have history
|
||||
# before it was created.
|
||||
pair_type_str: str = type(pair).__name__
|
||||
create_dt: datetime = from_timestamp(creation_time_s)
|
||||
raise DataUnavailable(
|
||||
f'No history prior to\n'
|
||||
f'`{pair_type_str}.creation_timestamp: int = '
|
||||
f'{pair.creation_timestamp}\n\n'
|
||||
f'------ deribit sux ------\n'
|
||||
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
|
||||
f'creation_time_s: {creation_time_s}\n'
|
||||
f'create_dt: {create_dt}\n'
|
||||
)
|
||||
raise NoData(
|
||||
f'No frame for {start_dt} -> {end_dt}\n'
|
||||
)
|
||||
|
@ -126,14 +164,20 @@ async def open_history_client(
|
|||
|
||||
return array, start_dt, end_dt
|
||||
|
||||
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
||||
yield (
|
||||
get_ohlc,
|
||||
{ # backfill config
|
||||
'erlangs': 3,
|
||||
'rate': 3,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@async_lifo_cache()
|
||||
async def get_mkt_info(
|
||||
fqme: str,
|
||||
|
||||
) -> tuple[MktPair, Pair] | None:
|
||||
) -> tuple[MktPair, Pair|OptionPair] | None:
|
||||
|
||||
# uppercase since kraken bs_mktid is always upper
|
||||
if 'deribit' not in fqme.lower():
|
||||
|
@ -149,7 +193,7 @@ async def get_mkt_info(
|
|||
# returns, always!
|
||||
expiry: str = expiry.upper()
|
||||
venue: str = venue.upper()
|
||||
venue_lower: str = venue.lower()
|
||||
# venue_lower: str = venue.lower()
|
||||
|
||||
mkt_mode: str = 'option'
|
||||
|
||||
|
@ -195,8 +239,6 @@ async def stream_quotes(
|
|||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
sym = symbols[0].split('.')[0]
|
||||
|
||||
|
@ -217,7 +259,8 @@ async def stream_quotes(
|
|||
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
|
||||
cache = client._pairs
|
||||
# TODO, uhh use it ?? XD
|
||||
# cache = client._pairs
|
||||
|
||||
last_trades = (await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||
|
@ -247,9 +290,16 @@ async def stream_quotes(
|
|||
|
||||
feed_is_live.set()
|
||||
|
||||
# deliver until cancelled
|
||||
async for typ, quote in stream:
|
||||
topic = quote['symbol']
|
||||
await send_chan.send({topic: quote})
|
||||
topic: str = quote['symbol']
|
||||
log.info(
|
||||
f'deribit {typ!r} quote\n\n'
|
||||
f'{quote}\n'
|
||||
)
|
||||
await send_chan.send({
|
||||
topic: quote,
|
||||
})
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -259,13 +309,14 @@ async def open_symbol_search(
|
|||
async with open_cached_client('deribit') as client:
|
||||
|
||||
# load all symbols locally for fast search
|
||||
cache = client._pairs
|
||||
# cache = client._pairs
|
||||
await ctx.started()
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
pattern: str
|
||||
async for pattern in stream:
|
||||
|
||||
# NOTE: pattern fuzzy-matching is done within
|
||||
# the methd impl.
|
||||
pairs: dict[str, Pair] = await client.search_symbols(
|
||||
|
|
|
@ -22,6 +22,7 @@ from __future__ import annotations
|
|||
import pendulum
|
||||
from typing import (
|
||||
Literal,
|
||||
Optional,
|
||||
)
|
||||
from decimal import Decimal
|
||||
|
||||
|
@ -111,7 +112,6 @@ class OptionPair(Pair, frozen=True):
|
|||
block_trade_min_trade_amount: int # '25'
|
||||
block_trade_commission: float # '0.003'
|
||||
|
||||
|
||||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
||||
|
||||
|
@ -122,7 +122,7 @@ class OptionPair(Pair, frozen=True):
|
|||
|
||||
@property
|
||||
def venue(self) -> str:
|
||||
return 'option'
|
||||
return f'{self.instrument_type}_option'
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
|
|
|
@ -360,7 +360,7 @@ async def open_autorecon_ws(
|
|||
|
||||
'''
|
||||
JSONRPC response-request style machinery for transparent multiplexing
|
||||
of msgs over a NoBsWs.
|
||||
of msgs over a `NoBsWs`.
|
||||
|
||||
'''
|
||||
|
||||
|
@ -377,6 +377,16 @@ async def open_jsonrpc_session(
|
|||
url: str,
|
||||
start_id: int = 0,
|
||||
response_type: type = JSONRPCResult,
|
||||
msg_recv_timeout: float = float('inf'),
|
||||
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
|
||||
# and options mkts are generally "slow moving"..
|
||||
#
|
||||
# FURTHER if we break the underlying ws connection then since we
|
||||
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
|
||||
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
|
||||
# broken and never restored with wtv init sequence is required to
|
||||
# re-establish a working req-resp session.
|
||||
|
||||
# request_type: Optional[type] = None,
|
||||
# request_hook: Optional[Callable] = None,
|
||||
# error_hook: Optional[Callable] = None,
|
||||
|
@ -388,12 +398,18 @@ async def open_jsonrpc_session(
|
|||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_autorecon_ws(url) as ws
|
||||
open_autorecon_ws(
|
||||
url=url,
|
||||
msg_recv_timeout=msg_recv_timeout,
|
||||
) as ws
|
||||
):
|
||||
rpc_id: Iterable[int] = count(start_id)
|
||||
rpc_results: dict[int, dict] = {}
|
||||
|
||||
async def json_rpc(method: str, params: dict) -> dict:
|
||||
async def json_rpc(
|
||||
method: str,
|
||||
params: dict,
|
||||
) -> dict:
|
||||
'''
|
||||
perform a json rpc call and wait for the result, raise exception in
|
||||
case of error field present on response
|
||||
|
|
Loading…
Reference in New Issue