Compare commits

...

5 Commits

Author SHA1 Message Date
Tyler Goodlet 28e8628c61 Report the closest (via fuzzy match) pairs on unmatched input 2024-11-19 17:50:26 -05:00
Tyler Goodlet b734245183 Signal hist start using `OptionPair.creation_timestamp`
Such that the `get_hist()` query func raises `DataUnavailable` with an
explicit message regarding the start of the (option) contract's
lifetime.

Other,
- mask some unused imports (for now?)
- drop a duplicate `tractor.get_console_log()` call which was causing
  duplicate console emits (it's already setup by brokerd init now).
- comment various unused code bits i found.
- add a info log around live quotes so we can see for the moment when
  they actually occur.. XD
2024-11-19 17:45:39 -05:00
Tyler Goodlet dc2c379d86 `.deribit.api` bit of tidying/typing
There were some imports missing or unused as well as a variety of spots
that had grokability issues due to missing type hints.

Other tweaks as part some more thorough manual testing:
- always raise when not `brokers.toml` section since the API can never
  work (no free data without keys).
- inline the `Asset.atype='crypto_currency` field despite it maybe not
  being the best value for `OptionPair` instruments..
- tossed in a now-masked pause block for debugging history queries in
  `Client.bars()`.
- commented out all the live order ctl (internal) endpoints for now
  since they're unused.
2024-11-19 17:09:16 -05:00
Tyler Goodlet be84d0dae1 'Fix `Optional` and use `'linear/reverse'` in `OptionPair.venue`' 2024-11-19 17:05:13 -05:00
Tyler Goodlet bdc3bc9219 Mk jsronrpc's underlying ws timeout `float('inf')`
Since currently we're only using this IPC subsys for `deribit`, and
generally speaking we're primarly supporting options markets (which are
fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs`
on timeout since doing so normally breaks the jsron-rpc IPC session.
Without a proper `fixture` passed to `open_autorecon_ws()` (which we
should eventually implement!!) relying on a timeout-to-reset more or
less will just cause breakage issues - a proper reconnect sequence must
be implemented before using that feature.

Deats,
- expose and proxy through the `msg_recv_timeout` from
  `open_jsonrpc_session()` into the underlying `open_autorecon_ws()`
  call.
2024-11-19 16:58:40 -05:00
4 changed files with 226 additions and 124 deletions

View File

@ -28,6 +28,8 @@ from decimal import (
Decimal, Decimal,
) )
from functools import partial from functools import partial
from pathlib import Path
from pprint import pformat
import time import time
from typing import ( from typing import (
Any, Any,
@ -37,8 +39,6 @@ from typing import (
from pendulum import now from pendulum import now
import trio import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
@ -55,8 +55,10 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks. # types for managing the cb callbacks.
# from cryptofeed.types import L1Book # from cryptofeed.types import L1Book
from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
MarketType, MarketType,
@ -64,9 +66,9 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
JSONRPCChannel, # JSONRPCChannel,
KLinesResult, KLinesResult,
Trade, # Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -77,7 +79,7 @@ from piker.accounting import (
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
Struct, # Struct,
) )
from piker.data._web_bs import ( from piker.data._web_bs import (
open_jsonrpc_session open_jsonrpc_session
@ -97,7 +99,7 @@ _spawn_kwargs = {
# convert datetime obj timestamp to unixtime in milliseconds # convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when): def deribit_timestamp(when) -> int:
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -179,16 +181,18 @@ def get_config() -> dict[str, Any]:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict = {} section: dict|None = conf.get('deribit')
section = conf.get('deribit')
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') raise ValueError(
return {} f'No `[deribit]` section found in\n'
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {}) conf_option = section.get('option', {})
section.clear # clear the dict to reuse it section.clear # clear the dict to reuse it
@ -223,7 +227,11 @@ class Client:
self._auth_ts = None self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: async def _json_rpc_auth_wrapper(
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will """Background task that adquires a first access token and then will
refresh the access token. refresh the access token.
@ -250,9 +258,6 @@ class Client:
return await self.json_rpc(*args, **kwargs) return await self.json_rpc(*args, **kwargs)
async def get_balances( async def get_balances(
self, self,
kind: str = 'option' kind: str = 'option'
@ -277,23 +282,29 @@ class Client:
venue: str | None = None, venue: str | None = None,
) -> dict[str, Asset]: ) -> dict[str, Asset]:
"""Return the set of asset balances for this account '''
by symbol. Return the set of asset balances for this account
""" by (deribit's) symbol.
'''
assets = {} assets = {}
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
'public/get_currencies', 'public/get_currencies',
params={} params={}
) )
currencies = resp.result currencies: list[dict] = resp.result
for currency in currencies: for currency in currencies:
name = currency['currency'] name: str = currency['currency']
tx_tick = digits_to_dec(currency['fee_precision']) tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset( assets[name] = Asset(
name=name, name=name,
atype=atype, atype='crypto_currency',
tx_tick=tx_tick) tx_tick=tx_tick,
)
instruments = await self.symbol_info(currency=name) instruments = await self.symbol_info(currency=name)
for instrument in instruments: for instrument in instruments:
@ -301,7 +312,8 @@ class Client:
assets[pair.symbol] = Asset( assets[pair.symbol] = Asset(
name=pair.symbol, name=pair.symbol,
atype=pair.venue, atype=pair.venue,
tx_tick=pair.size_tick) tx_tick=pair.size_tick,
)
return assets return assets
@ -358,6 +370,19 @@ class Client:
return cached_pair return cached_pair
if sym: if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym] return pair_table[sym]
else: else:
return self._pairs return self._pairs
@ -381,7 +406,7 @@ class Client:
params: dict[str, str] = { params: dict[str, str] = {
'currency': currency.upper(), 'currency': currency.upper(),
'kind': kind, 'kind': kind,
'expired': str(expired).lower() 'expired': expired,
} }
resp: JSONRPCResult = await self._json_rpc_auth_wrapper( resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -389,7 +414,7 @@ class Client:
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind] pair_type: Pair = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {} instruments: dict[str, Pair] = {}
@ -427,12 +452,15 @@ class Client:
mkt_pairs = await self.symbol_info() mkt_pairs = await self.symbol_info()
if not mkt_pairs: if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}') raise SymbolNotFound(
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {} pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs: for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue] pair_type: Pair|OptionPair = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -480,12 +508,14 @@ class Client:
if end_dt is None: if end_dt is None:
end_dt = now('UTC') end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
'minute').subtract(minutes=limit) 'minute'
).subtract(minutes=limit)
start_time = deribit_timestamp(start_dt) start_time: int = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt) end_time: int = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
@ -499,8 +529,12 @@ class Client:
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars: list[tuple] = [] new_bars: list[tuple] = []
for i in range(len(result.close)): # if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
for i in range(len(result.close)):
row = [ row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
@ -668,68 +702,69 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay( # TODO, move all to `.broker` submod!
fh: FeedHandler, # async def aio_order_feed_relay(
instrument: Symbol, # fh: FeedHandler,
from_trio: asyncio.Queue, # instrument: Symbol,
to_trio: trio.abc.SendChannel, # from_trio: asyncio.Queue,
) -> None: # to_trio: trio.abc.SendChannel,
async def _fill(data: dict, receipt_timestamp): # ) -> None:
breakpoint() # async def _fill(data: dict, receipt_timestamp):
# breakpoint()
async def _order_info(data: dict, receipt_timestamp): # async def _order_info(data: dict, receipt_timestamp):
breakpoint() # breakpoint()
fh.add_feed( # fh.add_feed(
DERIBIT, # DERIBIT,
channels=[FILLS, ORDER_INFO], # channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()], # symbols=[instrument.upper()],
callbacks={ # callbacks={
FILLS: _fill, # FILLS: _fill,
ORDER_INFO: _order_info, # ORDER_INFO: _order_info,
}) # })
if not fh.running: # if not fh.running:
fh.run( # fh.run(
start_loop=False, # start_loop=False,
install_signal_handlers=False) # install_signal_handlers=False)
# sync with trio # # sync with trio
to_trio.send_nowait(None) # to_trio.send_nowait(None)
await asyncio.sleep(float('inf')) # await asyncio.sleep(float('inf'))
@acm # @acm
async def open_order_feed( # async def open_order_feed(
instrument: list[str] # instrument: list[str]
) -> trio.abc.ReceiveStream: # ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: # async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( # async with to_asyncio.open_channel_from(
partial( # partial(
aio_order_feed_relay, # aio_order_feed_relay,
fh, # fh,
instrument # instrument
) # )
) as (first, chan): # ) as (first, chan):
yield chan # yield chan
@acm # @acm
async def maybe_open_order_feed( # async def maybe_open_order_feed(
instrument: str # instrument: str
) -> trio.abc.ReceiveStream: # ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # # TODO: add a predicate to maybe_open_context
async with maybe_open_context( # async with maybe_open_context(
acm_func=open_order_feed, # acm_func=open_order_feed,
kwargs={ # kwargs={
'instrument': instrument.split('.')[0], # 'instrument': instrument.split('.')[0],
'fh': fh # 'fh': fh
}, # },
key=f'{instrument.split('.')[0]}-order', # key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed): # ) as (cache_hit, feed):
if cache_hit: # if cache_hit:
yield broadcast_receiver(feed, 10) # yield broadcast_receiver(feed, 10)
else: # else:
yield feed # yield feed

View File

@ -18,56 +18,65 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Callable from typing import (
from pprint import pformat # Any,
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
now,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.accounting import (
Asset,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
) )
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
DataUnavailable,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
) )
from piker.log import get_logger, get_console_log from piker.log import (
from piker.data import ShmArray get_logger,
)
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler # from cryptofeed import FeedHandler
from cryptofeed.defines import ( # from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT # DERIBIT,
) # L1_BOOK,
from cryptofeed.symbols import Symbol # TRADES,
# OPTION,
# CALL,
# PUT,
# )
# from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client,
get_config, # get_config,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst, piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
Pair, Pair,
OptionPair, OptionPair,
Trade,
) )
_spawn_kwargs = { _spawn_kwargs = {
@ -86,6 +95,10 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -105,6 +118,31 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData( raise NoData(
f'No frame for {start_dt} -> {end_dt}\n' f'No frame for {start_dt} -> {end_dt}\n'
) )
@ -126,14 +164,20 @@ async def open_history_client(
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache() @async_lifo_cache()
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[MktPair, Pair] | None: ) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper # uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower(): if 'deribit' not in fqme.lower():
@ -149,7 +193,7 @@ async def get_mkt_info(
# returns, always! # returns, always!
expiry: str = expiry.upper() expiry: str = expiry.upper()
venue: str = venue.upper() venue: str = venue.upper()
venue_lower: str = venue.lower() # venue_lower: str = venue.lower()
mkt_mode: str = 'option' mkt_mode: str = 'option'
@ -195,8 +239,6 @@ async def stream_quotes(
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
@ -217,7 +259,8 @@ async def stream_quotes(
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
cache = client._pairs # TODO, uhh use it ?? XD
# cache = client._pairs
last_trades = (await client.last_trades( last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades cb_sym_to_deribit_inst(nsym), count=1)).trades
@ -247,9 +290,16 @@ async def stream_quotes(
feed_is_live.set() feed_is_live.set()
# deliver until cancelled
async for typ, quote in stream: async for typ, quote in stream:
topic = quote['symbol'] topic: str = quote['symbol']
await send_chan.send({topic: quote}) log.info(
f'deribit {typ!r} quote\n\n'
f'{quote}\n'
)
await send_chan.send({
topic: quote,
})
@tractor.context @tractor.context
@ -259,13 +309,14 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = client._pairs # cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within # NOTE: pattern fuzzy-matching is done within
# the methd impl. # the methd impl.
pairs: dict[str, Pair] = await client.search_symbols( pairs: dict[str, Pair] = await client.search_symbols(

View File

@ -22,6 +22,7 @@ from __future__ import annotations
import pendulum import pendulum
from typing import ( from typing import (
Literal, Literal,
Optional,
) )
from decimal import Decimal from decimal import Decimal
@ -111,7 +112,6 @@ class OptionPair(Pair, frozen=True):
block_trade_min_trade_amount: int # '25' block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003' block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair' ns_path: str = 'piker.brokers.deribit:OptionPair'
@ -122,7 +122,7 @@ class OptionPair(Pair, frozen=True):
@property @property
def venue(self) -> str: def venue(self) -> str:
return 'option' return f'{self.instrument_type}_option'
@property @property
def bs_fqme(self) -> str: def bs_fqme(self) -> str:

View File

@ -360,7 +360,7 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs. of msgs over a `NoBsWs`.
''' '''
@ -377,6 +377,16 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
# request_type: Optional[type] = None, # request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None, # request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None, # error_hook: Optional[Callable] = None,
@ -388,12 +398,18 @@ async def open_jsonrpc_session(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(url) as ws open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict: async def json_rpc(
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response