Compare commits

...

15 Commits

Author SHA1 Message Date
Nelson Torres 2b9300103d Deribit api key changes introduce:
- `get_timestamp_int`: added this is the hack, so we can aboid use the custom deribit date format.

- `get_currencies`: added so we could get all deribit's available currencies.

- Also a couple of format fixes.
2025-02-17 15:44:37 -03:00
Tyler Goodlet fdde87c2d0 `deribit.feed`: fix "trade" event streaming
The main change needed to make `piker.data.feed._FeedsBus` work was
to correctly format the `'trade'` msgs with the (new schema) expected
`'ticks': list[dict]` field which,
- we compute the `piker` quote-msg-`dict` from the (now directly proxied through)
  `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`.
- similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side
  `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()`
  and passed to the `cryptofeed.FeedHandler`) and instead mod the
  callback to simply pass through the `.types.L1Book` ref directly to
  the `piker`/`trio` side task for conversion.

In support of all that,
- mask-to-drop the alt-branch to wait on a first rt event when the
  `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't
  seem like this ever even happens?
- add a buncha typing, comments and doc-strs to the routines in
  `.deribit.api` including notes on where we can choose to mod the
  `.bs_fqme` for our eventually preferred `piker` style format.
- simplify some nested `@acm` enters to the new single `async with
  <tuple>)` style.
- be particularly pedantic about typing
  `tractor.to_asyncio.LinkedTaskChannel`
- bit of pep8 line-spacing fixes in `.venues`.
2025-02-17 15:44:37 -03:00
Tyler Goodlet dc57569f1c `.deribit.feed`: get live quotes workin (again)
The quote-msg `'topic'` field was being set and sent as the
`OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str`
as is required for matching on the `piker.data.feed` side. So change to
that and simplify the actual `.bs_fqme: str` value to NOT include the
ISO-format time (for now) since it's a big ugly and longer term we need
a `piker`-fqme friendly-on-ze-eyes format/style anyway..
2025-02-17 15:44:37 -03:00
Tyler Goodlet dc2b5a37d1 Bit more `cryptofeed` adapter formatting and typing for clarity.. 2025-02-17 15:44:37 -03:00
Tyler Goodlet f1675181d8 .deribit.venues: add todo for an ideal `OptionPair.expiry` fmt/value 2025-02-17 15:44:37 -03:00
Tyler Goodlet 1ae1c3c059 Report the closest (via fuzzy match) pairs on unmatched input 2025-02-17 15:44:37 -03:00
Tyler Goodlet f3a74b279e Signal hist start using `OptionPair.creation_timestamp`
Such that the `get_hist()` query func raises `DataUnavailable` with an
explicit message regarding the start of the (option) contract's
lifetime.

Other,
- mask some unused imports (for now?)
- drop a duplicate `tractor.get_console_log()` call which was causing
  duplicate console emits (it's already setup by brokerd init now).
- comment various unused code bits i found.
- add a info log around live quotes so we can see for the moment when
  they actually occur.. XD
2025-02-17 15:44:37 -03:00
Tyler Goodlet ce2945d6b0 `.deribit.api` bit of tidying/typing
There were some imports missing or unused as well as a variety of spots
that had grokability issues due to missing type hints.

Other tweaks as part some more thorough manual testing:
- always raise when not `brokers.toml` section since the API can never
  work (no free data without keys).
- inline the `Asset.atype='crypto_currency` field despite it maybe not
  being the best value for `OptionPair` instruments..
- tossed in a now-masked pause block for debugging history queries in
  `Client.bars()`.
- commented out all the live order ctl (internal) endpoints for now
  since they're unused.
2025-02-17 15:44:37 -03:00
Tyler Goodlet 40fbccd667 'Fix `Optional` and use `'linear/reverse'` in `OptionPair.venue`' 2025-02-17 15:44:37 -03:00
Nelson Torres 08e8fb48f4 Deribit's feed fix
- `FeedInit` for init_msgs in `stream_quotes`.

- new cache is `client_pairs` so is replacing the old `client.cache_symbols`.

- `get_mkt_info` added

- `get_ohlc` fixed to comply the new ways of the feed.
2025-02-17 15:44:37 -03:00
Nelson Torres 0dcd1236c4 Deribit's api fix
key changes:

- Resolved the issue with the expiration dates from deribits, now we int instead of the crazy custom deribits format.

- The client now has a new  `_json_rpc_auth_wrapper` that adquires a first access token and then will refresh the access token when this expires.

- `get_assets` fixed, now  we use the public endpoint to check the availables assets, in the future probably this will change, but for now is working just fine.

- `get_mkt_pairs` added.

- `exch_info` added.

- `cache_symbols` fixed.

- Also a lot of reformat made in api.
2025-02-17 15:44:37 -03:00
Nelson Torres f12a6f7438 Venues
Moved from api to venues all the msgspecs structs, also added critical imports in api, feed and __init__ mods.
2025-02-17 15:44:37 -03:00
Tyler Goodlet bda23c25b9 Mk jsronrpc's underlying ws timeout `float('inf')`
Since currently we're only using this IPC subsys for `deribit`, and
generally speaking we're primarly supporting options markets (which are
fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs`
on timeout since doing so normally breaks the jsron-rpc IPC session.
Without a proper `fixture` passed to `open_autorecon_ws()` (which we
should eventually implement!!) relying on a timeout-to-reset more or
less will just cause breakage issues - a proper reconnect sequence must
be implemented before using that feature.

Deats,
- expose and proxy through the `msg_recv_timeout` from
  `open_jsonrpc_session()` into the underlying `open_autorecon_ws()`
  call.
2025-02-17 13:40:04 -05:00
Tyler Goodlet 4c6c1029d6 Refine history gap/termination signalling
Namely handling backends which do not provide a default "frame
size-duration" in their init-config by making the backfiller guess the
value based on the first frame received.

Deats,
- adjust `start_backfill()` to take a more explicit
  `def_frame_duration: Duration` expected to be unpacked from any
  backend hist init-config by the `tsdb_backfill()` caller which now
  also computes a value from the first received frame when the config
  section isn't provided.
- in `start_backfill()` we now always expect the `def_frame_duration`
  input and always decrement the query range by this value whenever
  a `NoData` is raised by the provider-backend paired with an explicit
  `log.warning()` about the handling.
- also relay any `DataUnavailable.args[0]` message from the provider
  in the handler.
- repair "gap reporting" which checks for expected frame duration vs.
  that received with much better humanized logging on the missing
  segment using `pendulum.Interval/Duration.in_words()` output.
2025-02-17 13:26:09 -05:00
Tyler Goodlet 3ab9a9b741 Only use `frame_types` if delivered during enter
The `open_history_client()` provider endpoint can *optionally*
deliver a `frame_types: dict[int, pendulum.Duration]` subsection in its
`config: dict[str, dict]` (as was implemented with the `ib` backend).
This allows the `tsp` backfilling machinery to use this "recommended
frame duration" to subtract from the `last_start_dt` any time a `NoData`
gap is signalled by the `get_hist()` call allowing gaps to be ignored
safely without missing history by knowing the next earliest dt we can
query from using the `end_dt`. However, currently all crypto$ providers
haven't implemented this feat yet..

As such only try to use the `frame_types` feature if provided when
handling `NoData` conditions inside `tsp.start_backfill()` and otherwise
raise as normal.
2025-02-17 13:26:02 -05:00
7 changed files with 1102 additions and 440 deletions

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib', 'ib',
'kraken', 'kraken',
'kucoin', 'kucoin',
'deribit',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade # wstrade
# iex # iex
# deribit
# bitso # bitso
] ]

View File

@ -25,6 +25,7 @@ from .api import (
get_client, get_client,
) )
from .feed import ( from .feed import (
get_mkt_info,
open_history_client, open_history_client,
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
@ -34,15 +35,20 @@ from .feed import (
# open_trade_dialog, # open_trade_dialog,
# norm_trade_records, # norm_trade_records,
# ) # )
from .venues import (
OptionPair,
)
log = get_logger(__name__) log = get_logger(__name__)
__all__ = [ __all__ = [
'get_client', 'get_client',
# 'trades_dialogue', # 'trades_dialogue',
'get_mkt_info',
'open_history_client', 'open_history_client',
'open_symbol_search', 'open_symbol_search',
'stream_quotes', 'stream_quotes',
'OptionPair',
# 'norm_trade_records', # 'norm_trade_records',
] ]

File diff suppressed because it is too large Load Diff

View File

@ -18,38 +18,59 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Callable from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import cryptofeed
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum from pendulum import (
from rapidfuzz import process as fuzzy from_timestamp,
)
import numpy as np import numpy as np
import tractor import tractor
from piker.brokers import open_cached_client from piker.accounting import (
from piker.log import get_logger, get_console_log Asset,
from piker.data import ShmArray MktPair,
from piker.brokers._util import ( unpack_fqme,
BrokerError, )
from piker.brokers import (
open_cached_client,
NoData,
DataUnavailable, DataUnavailable,
) )
from piker._cacheables import (
from cryptofeed import FeedHandler async_lifo_cache,
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from .api import ( from .api import (
Client, Trade, Client,
get_config, # get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = { _spawn_kwargs = {
'infect_asyncio': True, 'infect_asyncio': True,
@ -64,90 +85,215 @@ async def open_history_client(
mkt: MktPair, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
end_dt: Optional[datetime] = None, timeframe: float,
start_dt: Optional[datetime] = None, end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars( array: np.ndarray = await client.bars(
instrument, mkt,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
raise DataUnavailable if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time']) start_dt = from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time']) end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging '''
get_console_log(loglevel or tractor.current_actor().loglevel) Open a live quote stream for the market set defined by `symbols`.
sym = symbols[0] Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
init_msgs = { # build out init msgs according to latest spec
# pass back token, and bool, signalling if we're the writer init_msgs.append(
# and that history has been written FeedInit(
sym: { mkt_info=mkt,
'symbol_info': { )
'asset_type': 'option', )
'price_tick_size': 0.0005 # build `cryptofeed` feed-handle
}, cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
nsym = piker_sym_to_cb_sym(sym) from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
async with maybe_open_price_feed(sym) as stream: # load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
cache = await client.cache_symbols() # TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
last_trades = (await client.last_trades( # else:
cb_sym_to_deribit_inst(nsym), count=1)).trades last_trade = Trade(
**(last_trades[0])
)
if len(last_trades) == 0: first_quote: dict = {
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym, 'symbol': sym,
'last': last_trade.price, 'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, 'brokerd_ts': last_trade.timestamp,
@ -158,13 +304,84 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp 'broker_ts': last_trade.timestamp
}] }]
} }
task_status.started((init_msgs, first_quote)) task_status.started((
init_msgs,
first_quote,
))
feed_is_live.set() feed_is_live.set()
async for typ, quote in stream: # NOTE XXX, static for now!
topic = quote['symbol'] # => since this only handles ONE mkt feed at a time we
await send_chan.send({topic: quote}) # don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context @tractor.context
@ -174,12 +391,21 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = await client.cache_symbols() # cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream: async for pattern in stream:
# repack in dict form
await stream.send( # NOTE: pattern fuzzy-matching is done within
await client.search_symbols(pattern)) # the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)

View File

@ -0,0 +1,196 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -360,7 +360,7 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs. of msgs over a `NoBsWs`.
''' '''
@ -377,6 +377,16 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
''' '''
Init a json-RPC-over-websocket connection to the provided `url`. Init a json-RPC-over-websocket connection to the provided `url`.
@ -393,12 +403,18 @@ async def open_jsonrpc_session(
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
open_autorecon_ws(url) as ws open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict: async def json_rpc(
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response

View File

@ -44,8 +44,10 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and end_dt < start_dt and
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
frame_types: dict[str, Duration] | None, def_frame_duration: Duration,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -379,22 +383,23 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: drop this right and just expose the backfill # TODO: per-provider default history-durations?
# limits inside a [storage] section in conf.toml? # -[ ] inside the `open_history_client()` config allow
# when no tsdb "last datum" is provided, we just load # declaring the history duration limits instead of
# some near-term history. # guessing and/or applying the same limits to all?
# periods = { #
# 1: {'days': 1}, # -[ ] allow declaring (default) per-provider backfill
# 60: {'days': 14}, # limits inside a [storage] sub-section in conf.toml?
# } #
# NOTE, when no tsdb "last datum" is provided, we just
# do a decently sized backfill and load it into storage. # load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend = True update_start_on_prepend: bool = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -416,7 +421,6 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -426,37 +430,58 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
# 3 cases: orig_last_start_dt: datetime = last_start_dt
# - frame in the middle of a legit venue gap gap_report: str = (
# - history actually began at the `last_start_dt` f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
# - some other unknown error (ib blocking the f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
# history bc they don't want you seeing how they f'last_start_dt: {orig_last_start_dt}\n\n'
# cucked all the tinas..) f'bf_until: {backfill_until_dt}\n'
if dur := frame_types.get(timeframe): )
# decrement by a frame's worth of duration and # EMPTY FRAME signal with 3 (likely) causes:
# retry a few times. #
last_start_dt.subtract( # 1. range contains legit gap in venue history
seconds=dur.total_seconds() # 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
log.warning( gap_report += (
f'{mod.name} -> EMPTY FRAME for end_dt?\n' f'Decrementing `end_dt` and retrying with,\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n' f'def_frame_duration: {def_frame_duration}\n'
'bf_until <- last_start_dt:\n' f'(new) last_start_dt: {last_start_dt}\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
) )
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable as due:
message: str = due.args[0]
log.warning( log.warning(
f'NO-MORE-DATA in range?\n' f'Provider {mod.name!r} halted backfill due to,\n\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n' f'{message}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n' f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way? # UGH: what's a better way?
# TODO: backends are responsible for being correct on # TODO: backends are responsible for being correct on
@ -465,34 +490,54 @@ async def start_backfill(
# to halt the request loop until the condition is # to halt the request loop until the condition is
# resolved or should the backend be entirely in # resolved or should the backend be entirely in
# charge of solving such faults? yes, right? # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
array['time'][0] time[0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
diff = last_start_dt - next_start_dt assert time[-1] == next_end_dt.timestamp()
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe recv_frame_dur: Duration = (
if frame_time_diff_s > expected_frame_size_s: from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
'GAP DETECTED:\n' f'{timeframe}s-series {reason} detected!\n'
f'last_start_dt: {last_start_dt}\n' f'fqme: {mkt.fqme}\n'
f'diff: {diff}\n' f'last_start_dt: {last_start_dt}\n\n'
f'frame_time_diff_s: {frame_time_diff_s}\n' f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -567,7 +612,8 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and write_tsdb and
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
@ -691,7 +737,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s = ( backfilled_size_s: Duration = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -914,6 +960,8 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -924,7 +972,6 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -953,6 +1000,25 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -983,7 +1049,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
frame_types=config.get('frame_types', None), def_frame_duration=def_frame_size,
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,