Compare commits

..

51 Commits

Author SHA1 Message Date
Tyler Goodlet bf195e8796 `deribit.feed`: fix "trade" event streaming
The main change needed to make `piker.data.feed._FeedsBus` work was
to correctly format the `'trade'` msgs with the (new schema) expected
`'ticks': list[dict]` field which,
- we compute the `piker` quote-msg-`dict` from the (now directly proxied through)
  `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`.
- similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side
  `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()`
  and passed to the `cryptofeed.FeedHandler`) and instead mod the
  callback to simply pass through the `.types.L1Book` ref directly to
  the `piker`/`trio` side task for conversion.

In support of all that,
- mask-to-drop the alt-branch to wait on a first rt event when the
  `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't
  seem like this ever even happens?
- add a buncha typing, comments and doc-strs to the routines in
  `.deribit.api` including notes on where we can choose to mod the
  `.bs_fqme` for our eventually preferred `piker` style format.
- simplify some nested `@acm` enters to the new single `async with
  <tuple>)` style.
- be particularly pedantic about typing
  `tractor.to_asyncio.LinkedTaskChannel`
- bit of pep8 line-spacing fixes in `.venues`.
2025-01-29 15:47:29 -03:00
Tyler Goodlet cbe274089e Ignore non-`.parquet` (suffixed) paths for now during tsdb fs-indexing 2025-01-29 15:47:29 -03:00
Tyler Goodlet 879cf3a9f3 Mask `ruff` config and pin `websockets=0.12`
- the `ruff` section in the `pyproject.toml` is somehow borked? (even
  though it def was working a while back..)
- `websockets` is completely broken in latest version since it's using
  old-ass `asyncio` APIs of some sort i think??
2025-01-29 15:47:29 -03:00
Tyler Goodlet ece32f8ed0 `.deribit.feed`: get live quotes workin (again)
The quote-msg `'topic'` field was being set and sent as the
`OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str`
as is required for matching on the `piker.data.feed` side. So change to
that and simplify the actual `.bs_fqme: str` value to NOT include the
ISO-format time (for now) since it's a big ugly and longer term we need
a `piker`-fqme friendly-on-ze-eyes format/style anyway..
2025-01-29 15:47:29 -03:00
Tyler Goodlet f3735dd712 Bit more `cryptofeed` adapter formatting and typing for clarity.. 2025-01-29 15:47:29 -03:00
Tyler Goodlet 39542fdb50 .deribit.venues: add todo for an ideal `OptionPair.expiry` fmt/value 2025-01-29 15:47:29 -03:00
Tyler Goodlet a75cfbb124 `.data._sampling`: warn about subscriber-less msgs
Since it usually means the data-provider backend is keying the msgs
incorrectly (not using the equivalent `MktPair.bs_fqme` which as
would be rendered from the delivered `FeedInit.mkt` instance..) and
reporting the subs list should make it clear how the fqme matching is
off.

Deats,
- use the new `.log.mk_repr()` for a formatter.
- add a commented info emission that can be unmasked to help debug any
  such cases as mentioned in the summary ^^.
2025-01-29 15:47:29 -03:00
Tyler Goodlet 7b6410c535 Add `.log.mk_repr()` to create `reprlib.Repr`s 2025-01-29 15:47:29 -03:00
Tyler Goodlet 23e4f74c7a Report the closest (via fuzzy match) pairs on unmatched input 2025-01-29 15:47:29 -03:00
Tyler Goodlet c32d79f04b Signal hist start using `OptionPair.creation_timestamp`
Such that the `get_hist()` query func raises `DataUnavailable` with an
explicit message regarding the start of the (option) contract's
lifetime.

Other,
- mask some unused imports (for now?)
- drop a duplicate `tractor.get_console_log()` call which was causing
  duplicate console emits (it's already setup by brokerd init now).
- comment various unused code bits i found.
- add a info log around live quotes so we can see for the moment when
  they actually occur.. XD
2025-01-29 15:47:29 -03:00
Tyler Goodlet b560319043 `.deribit.api` bit of tidying/typing
There were some imports missing or unused as well as a variety of spots
that had grokability issues due to missing type hints.

Other tweaks as part some more thorough manual testing:
- always raise when not `brokers.toml` section since the API can never
  work (no free data without keys).
- inline the `Asset.atype='crypto_currency` field despite it maybe not
  being the best value for `OptionPair` instruments..
- tossed in a now-masked pause block for debugging history queries in
  `Client.bars()`.
- commented out all the live order ctl (internal) endpoints for now
  since they're unused.
2025-01-29 15:47:29 -03:00
Tyler Goodlet a06e7784f1 'Fix `Optional` and use `'linear/reverse'` in `OptionPair.venue`' 2025-01-29 15:47:29 -03:00
Tyler Goodlet 12a13768bf Mk jsronrpc's underlying ws timeout `float('inf')`
Since currently we're only using this IPC subsys for `deribit`, and
generally speaking we're primarly supporting options markets (which are
fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs`
on timeout since doing so normally breaks the jsron-rpc IPC session.
Without a proper `fixture` passed to `open_autorecon_ws()` (which we
should eventually implement!!) relying on a timeout-to-reset more or
less will just cause breakage issues - a proper reconnect sequence must
be implemented before using that feature.

Deats,
- expose and proxy through the `msg_recv_timeout` from
  `open_jsonrpc_session()` into the underlying `open_autorecon_ws()`
  call.
2025-01-29 15:47:29 -03:00
Tyler Goodlet 73ef2afe30 Refine history gap/termination signalling
Namely handling backends which do not provide a default "frame
size-duration" in their init-config by making the backfiller guess the
value based on the first frame received.

Deats,
- adjust `start_backfill()` to take a more explicit
  `def_frame_duration: Duration` expected to be unpacked from any
  backend hist init-config by the `tsdb_backfill()` caller which now
  also computes a value from the first received frame when the config
  section isn't provided.
- in `start_backfill()` we now always expect the `def_frame_duration`
  input and always decrement the query range by this value whenever
  a `NoData` is raised by the provider-backend paired with an explicit
  `log.warning()` about the handling.
- also relay any `DataUnavailable.args[0]` message from the provider
  in the handler.
- repair "gap reporting" which checks for expected frame duration vs.
  that received with much better humanized logging on the missing
  segment using `pendulum.Interval/Duration.in_words()` output.
2025-01-29 15:47:29 -03:00
Tyler Goodlet d425d36fc3 Only use `frame_types` if delivered during enter
The `open_history_client()` provider endpoint can *optionally*
deliver a `frame_types: dict[int, pendulum.Duration]` subsection in its
`config: dict[str, dict]` (as was implemented with the `ib` backend).
This allows the `tsp` backfilling machinery to use this "recommended
frame duration" to subtract from the `last_start_dt` any time a `NoData`
gap is signalled by the `get_hist()` call allowing gaps to be ignored
safely without missing history by knowing the next earliest dt we can
query from using the `end_dt`. However, currently all crypto$ providers
haven't implemented this feat yet..

As such only try to use the `frame_types` feature if provided when
handling `NoData` conditions inside `tsp.start_backfill()` and otherwise
raise as normal.
2025-01-29 15:47:29 -03:00
Nelson Torres 4e963e27d0 tractor branch updated, msgspec version upgraded, cython and greenback dependencies moved under dev group 2025-01-29 15:47:29 -03:00
Nelson Torres 3dcee16bf6 config refactor
only one get_config method for api class and cryptofeed feed handler
2025-01-29 15:44:33 -03:00
Nelson Torres 1f41b151d7 move constants to venue 2025-01-29 15:44:33 -03:00
Nelson Torres e8c196fd88 refactor redundant code 2025-01-29 15:44:33 -03:00
Nelson Torres 7cefb202fb name formatting fixes 2025-01-29 15:44:33 -03:00
Nelson Torres ddb4c0269f get_mkt_info cleanup 2025-01-29 15:44:33 -03:00
Nelson Torres 139f62f4de cache_symbols refactor 2025-01-29 15:44:33 -03:00
Nelson Torres 338e292002 json_rpc_auth_wrapper 2025-01-29 15:44:33 -03:00
Nelson Torres 13af0a90eb move object classes to venue 2025-01-29 15:44:33 -03:00
Nelson Torres e84781ca1e Added options symbols to get_assets 2025-01-29 15:44:33 -03:00
Nelson Torres 576b15e2c6 get_assets now uses public endpoint
It's better if the data is available through a public endpoint.
2025-01-29 15:44:33 -03:00
Nelson Torres b7e54571ea now using exch_info in search_symbols 2025-01-29 15:44:33 -03:00
Nelson Torres 1a295c0c21 Fix bs_fqme using venue and expiry 2025-01-29 15:44:33 -03:00
Nelson Torres f057a20bfa Added expiry property for OptionPair 2025-01-29 15:44:33 -03:00
Nelson Torres 9dba47902e No longer needed 2025-01-29 15:44:33 -03:00
Nelson Torres e0ecef04bb bs_mktid instead bs_fqme for deribits options 2025-01-29 15:44:33 -03:00
Nelson Torres 266347bcdb Fixed pair instrument name in search_symbols endpoint.
Fixed instrument in bars endpoint, for options in deribits bs_mktid instead bs_fqme.
Fixed the id is in msg.
2025-01-29 15:44:33 -03:00
Tyler Goodlet 051d43b559 data._web_bs: try to raise jsonrpc errors in parent task 2025-01-29 15:44:33 -03:00
Nelson Torres ac3ea5d960 Add necessary classes in init file for deribit 2025-01-29 15:44:33 -03:00
Nelson Torres cd1ad13720 Minor refactor in open_symbol_search 2025-01-29 15:44:33 -03:00
Nelson Torres 941340f853 stream_quotes now using FeedInit 2025-01-29 15:44:33 -03:00
Nelson Torres b046151464 symbol_info refactor 2025-01-29 15:44:33 -03:00
Nelson Torres f3d810f3ef search_symbols output type fix 2025-01-29 15:44:33 -03:00
Nelson Torres 1c433e7bda add get_mkt_pairs method 2025-01-29 15:44:33 -03:00
Nelson Torres af0bba5667 get_assets refactor 2025-01-29 15:44:33 -03:00
Nelson Torres 28bbf11484 formatting 2025-01-29 15:44:33 -03:00
Nelson Torres d1c00de05b created exch_info in api class 2025-01-29 15:44:33 -03:00
Nelson Torres 30caac4c27 modify self_pairs type to ChainMap 2025-01-29 15:44:33 -03:00
Nelson Torres 3de985d3cd Necessary imports 2025-01-29 15:44:33 -03:00
Nelson Torres cf46671e4b add get_market_info 2025-01-29 15:44:33 -03:00
Nelson Torres 87b0ac2331 Necessary imports 2025-01-29 15:44:33 -03:00
Nelson Torres d014f6f37d minor fixes in venues 2025-01-29 15:44:33 -03:00
Nelson Torres f50e7ad5f0 add class Pair in venues, PAIRTYPES for future uses 2025-01-29 15:44:33 -03:00
Nelson Torres de55fbd44a fix syms for venues.
little refactor in get_config, and created get_fh_config for cryptofeed.
2025-01-29 15:44:33 -03:00
Nelson Torres e941afc491 venues for deribit 2025-01-29 15:44:33 -03:00
Nelson Torres 2ed8b47883 Added cryptofeed and pyarrow necessary for the feed, enable deribit
in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
2025-01-29 15:44:33 -03:00
11 changed files with 1677 additions and 975 deletions

View File

@ -28,6 +28,8 @@ from decimal import (
Decimal,
)
from functools import partial
from pathlib import Path
from pprint import pformat
import time
from typing import (
Any,
@ -37,8 +39,6 @@ from typing import (
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np
from tractor.trionics import (
broadcast_receiver,
@ -55,8 +55,11 @@ from cryptofeed.defines import (
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from cryptofeed.types import (
L1Book,
Trade,
)
from piker.brokers import SymbolNotFound
from .venues import (
_ws_url,
MarketType,
@ -64,9 +67,7 @@ from .venues import (
Pair,
OptionPair,
JSONRPCResult,
JSONRPCChannel,
KLinesResult,
Trade,
LastTradesResult,
)
from piker.accounting import (
@ -77,7 +78,7 @@ from piker.accounting import (
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
Struct,
# Struct,
)
from piker.data._web_bs import (
open_jsonrpc_session
@ -96,9 +97,17 @@ _spawn_kwargs = {
}
# convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def deribit_timestamp(when: datetime) -> int:
'''
Convert conventional epoch timestamp, in secs, to unixtime in
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def str_to_cb_sym(name: str) -> Symbol:
@ -121,14 +130,20 @@ def str_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=new_expiry_date)
expiry_date=new_expiry_date
)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
(
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-'))
quote = base
quote: str = base
if option_type == 'P':
option_type = PUT
@ -143,14 +158,32 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date)
expiry_date=expiry_date
)
def cb_sym_to_deribit_inst(sym: Symbol):
# TODO, instead can't we just lookup the `MktPair` directly
# and pass it upward to `stream_quotes()`??
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
otype = (
'C' if sym.option_type == CALL
else 'P'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
@ -179,16 +212,18 @@ def get_config() -> dict[str, Any]:
conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section = conf.get('deribit')
section: dict|None = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
raise ValueError(
f'No `[deribit]` section found in\n'
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {})
section.clear # clear the dict to reuse it
@ -204,7 +239,10 @@ def get_config() -> dict[str, Any]:
class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__(
self,
@ -223,8 +261,12 @@ class Client:
self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
async def _json_rpc_auth_wrapper(
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will
refresh the access token.
@ -250,9 +292,6 @@ class Client:
return await self.json_rpc(*args, **kwargs)
async def get_balances(
self,
kind: str = 'option'
@ -277,23 +316,29 @@ class Client:
venue: str | None = None,
) -> dict[str, Asset]:
"""Return the set of asset balances for this account
by symbol.
"""
'''
Return the set of asset balances for this account
by (deribit's) symbol.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies = resp.result
currencies: list[dict] = resp.result
for currency in currencies:
name = currency['currency']
tx_tick = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
name: str = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset(
name=name,
atype=atype,
tx_tick=tx_tick)
atype='crypto_currency',
tx_tick=tx_tick,
)
instruments = await self.symbol_info(currency=name)
for instrument in instruments:
@ -301,9 +346,10 @@ class Client:
assets[pair.symbol] = Asset(
name=pair.symbol,
atype=pair.venue,
tx_tick=pair.size_tick)
tx_tick=pair.size_tick,
)
return assets
return assets
async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {}
@ -358,6 +404,19 @@ class Client:
return cached_pair
if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym]
else:
return self._pairs
@ -381,7 +440,7 @@ class Client:
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
'expired': str(expired).lower()
'expired': expired,
}
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -389,9 +448,9 @@ class Client:
params,
)
# convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind]
pair_type: Pair = PAIRTYPES[kind]
results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {}
for item in results:
symbol=item['instrument_name'].lower()
@ -427,12 +486,15 @@ class Client:
mkt_pairs = await self.symbol_info()
if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
raise SymbolNotFound(
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue]
pair_type: Pair|OptionPair = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -480,12 +542,14 @@ class Client:
if end_dt is None:
end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None:
start_dt = end_dt.start_of(
'minute').subtract(minutes=limit)
'minute'
).subtract(minutes=limit)
start_time = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt)
start_time: int = deribit_timestamp(start_dt)
end_time: int = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper(
@ -499,9 +563,13 @@ class Client:
result = KLinesResult(**resp.result)
new_bars: list[tuple] = []
for i in range(len(result.close)):
# if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
row = [
for i in range(len(result.close)):
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
@ -554,7 +622,7 @@ async def get_client(
@acm
async def open_feed_handler():
async def open_feed_handler() -> FeedHandler:
fh = FeedHandler(config=get_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@ -575,43 +643,37 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
'''
Relay price feed quotes from the `cryptofeed.FeedHandler` to
the `piker`-side `trio.task` consumers for delivery to consumer
sub-actors for various subsystems.
'''
async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
async def _l1(
book: L1Book,
receipt_timestamp: int,
) -> None:
'''
Relay-thru "l1 book" updates.
'''
to_trio.send_nowait(('l1', book))
# TODO, make this work!
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
# breakpoint()
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
}))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed(
DERIBIT,
@ -625,27 +687,35 @@ async def aio_price_feed_relay(
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
install_signal_handlers=False
)
# sync with trio
to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf'))
@acm
async def open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
) -> to_asyncio.LinkedTaskChannel:
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
) as (first, chan)
):
yield chan
@acm
@ -654,6 +724,7 @@ async def maybe_open_price_feed(
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
@ -668,68 +739,69 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
# # sync with trio
# to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
# await asyncio.sleep(float('inf'))
@acm
async def open_order_feed(
instrument: list[str]
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan
@acm
async def maybe_open_order_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context(
acm_func=open_order_feed,
kwargs={
'instrument': instrument.split('.')[0],
'fh': fh
},
key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument.split('.')[0],
# 'fh': fh
# },
# key=f'{instrument.split('.')[0]}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed

View File

@ -18,56 +18,58 @@
Deribit backend.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, Callable
from pprint import pformat
from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.accounting import (
Asset,
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
DataUnavailable,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client, Trade,
get_config,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
Client,
# get_config,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = {
@ -86,6 +88,10 @@ async def open_history_client(
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
@ -105,6 +111,31 @@ async def open_history_client(
end_dt=end_dt,
)
if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
@ -126,14 +157,20 @@ async def open_history_client(
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair] | None:
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
@ -149,7 +186,7 @@ async def get_mkt_info(
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
venue_lower: str = venue.lower()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
@ -175,64 +212,88 @@ async def get_mkt_info(
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
'''
Open a live quote stream for the market set defined by `symbols`.
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
FeedInit(
mkt_info=mkt,
)
)
nsym = piker_sym_to_cb_sym(sym)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream:
from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
cache = client._pairs
# load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
# TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
# else:
last_trade = Trade(
**(last_trades[0])
)
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
first_quote: dict = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
@ -243,13 +304,84 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((init_msgs, first_quote))
task_status.started((
init_msgs,
first_quote,
))
feed_is_live.set()
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context
@ -259,13 +391,13 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = client._pairs
# cache = client._pairs
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(

View File

@ -22,11 +22,10 @@ from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from msgspec import field
from piker.types import Struct
@ -111,18 +110,21 @@ class OptionPair(Pair, frozen=True):
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return 'option'
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
@ -152,6 +154,7 @@ class JSONRPCResult(Struct):
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
@ -168,6 +171,7 @@ class KLinesResult(Struct):
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
@ -186,6 +190,7 @@ class Trade(Struct):
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -30,6 +30,7 @@ import time
from typing import (
Any,
AsyncIterator,
Callable,
TYPE_CHECKING,
)
@ -54,6 +55,9 @@ from ._util import (
get_console_log,
)
from ..service import maybe_spawn_daemon
from piker.log import (
mk_repr,
)
if TYPE_CHECKING:
from ._sharedmem import (
@ -575,7 +579,6 @@ async def open_sample_stream(
async def sample_and_broadcast(
bus: _FeedsBus, # noqa
rt_shm: ShmArray,
hist_shm: ShmArray,
@ -596,11 +599,22 @@ async def sample_and_broadcast(
overruns = Counter()
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# TODO: ``numba`` this!
# XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO: `numba` this!
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
@ -673,6 +687,18 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key)
if not subs:
all_bs_fqmes: list[str] = list(
bus._subscribers.keys()
)
log.warning(
f'No subscribers for {brokername!r} live-quote ??\n'
f'broker_symbol: {broker_symbol}\n\n'
f'Maybe the backend-sys symbol does not match one of,\n'
f'{pfmt(all_bs_fqmes)}\n'
)
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct

View File

@ -360,7 +360,7 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs.
of msgs over a `NoBsWs`.
'''
@ -377,6 +377,16 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
# request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None,
@ -388,12 +398,18 @@ async def open_jsonrpc_session(
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
):
rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
async def json_rpc(
method: str,
params: dict,
) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
@ -483,7 +499,7 @@ async def open_jsonrpc_session(
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_id: int = msg['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error

View File

@ -540,7 +540,10 @@ async def open_feed_bus(
# subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_fqme, set())
bus._subscribers.setdefault(
bs_fqme,
set(),
)
# sync feed subscribers with flume handles
await ctx.started(

View File

@ -18,7 +18,11 @@
Log like a forester!
"""
import logging
import reprlib
import json
from typing import (
Callable,
)
import tractor
from pygments import (
@ -84,3 +88,27 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -161,7 +161,12 @@ class NativeStorageClient:
def index_files(self):
for path in self._datadir.iterdir():
if path.name in {'borked', 'expired',}:
if (
path.name in {'borked', 'expired',}
or
'.parquet' not in str(path)
):
# ignore all non-apache files (for now)
continue
key: str = path.name.rstrip('.parquet')

View File

@ -44,8 +44,10 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
Interval,
DateTime,
Duration,
duration as mk_duration,
from_timestamp,
)
import numpy as np
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
and
end_dt < start_dt
):
await tractor.pause()
break
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
raise
null_segs_detected.set()
# RECHECK for more null-gaps
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
async def start_backfill(
get_hist,
frame_types: dict[str, Duration] | None,
def_frame_duration: Duration,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -379,22 +383,23 @@ async def start_backfill(
update_start_on_prepend: bool = False
if backfill_until_dt is None:
# TODO: drop this right and just expose the backfill
# limits inside a [storage] section in conf.toml?
# when no tsdb "last datum" is provided, we just load
# some near-term history.
# periods = {
# 1: {'days': 1},
# 60: {'days': 14},
# }
# do a decently sized backfill and load it into storage.
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
# declaring the history duration limits instead of
# guessing and/or applying the same limits to all?
#
# -[ ] allow declaring (default) per-provider backfill
# limits inside a [storage] sub-section in conf.toml?
#
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
periods = {
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend = True
update_start_on_prepend: bool = True
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
@ -416,7 +421,6 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
)
try:
(
array,
@ -426,37 +430,58 @@ async def start_backfill(
timeframe,
end_dt=last_start_dt,
)
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
f'last_start_dt: {orig_last_start_dt}\n\n'
f'bf_until: {backfill_until_dt}\n'
)
# EMPTY FRAME signal with 3 (likely) causes:
#
# 1. range contains legit gap in venue history
# 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull
except DataUnavailable:
except DataUnavailable as due:
message: str = due.args[0]
log.warning(
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Provider {mod.name!r} halted backfill due to,\n\n'
f'{message}\n'
f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
)
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
@ -465,34 +490,54 @@ async def start_backfill(
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return
time: np.ndarray = array['time']
assert (
array['time'][0]
time[0]
==
next_start_dt.timestamp()
)
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
recv_frame_dur: Duration = (
from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning(
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
f'{timeframe}s-series {reason} detected!\n'
f'fqme: {mkt.fqme}\n'
f'last_start_dt: {last_start_dt}\n\n'
f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
)
# await tractor.pause()
to_push = diff_history(
array,
@ -567,7 +612,8 @@ async def start_backfill(
# long-term storage.
if (
storage is not None
and write_tsdb
and
write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
@ -688,7 +734,7 @@ async def back_load_from_tsdb(
last_tsdb_dt
and latest_start_dt
):
backfilled_size_s = (
backfilled_size_s: Duration = (
latest_start_dt - last_tsdb_dt
).seconds
# if the shm buffer len is not large enough to contain
@ -911,6 +957,8 @@ async def tsdb_backfill(
f'{pformat(config)}\n'
)
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
@ -921,7 +969,6 @@ async def tsdb_backfill(
timeframe,
config,
)
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
@ -950,6 +997,25 @@ async def tsdb_backfill(
mr_end_dt,
) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
@ -980,7 +1046,7 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
frame_types=config.get('frame_types', None),
def_frame_duration=def_frame_size,
mod=mod,
mkt=mkt,
shm=shm,

1751
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -25,11 +25,11 @@ build-backend = "poetry.core.masonry.api"
ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# "piker/ui/qt.py" = [
# "E402",
# 'F401', # unused imports (without __all__ or blah as blah)
# # "F841", # unused variable rules
# ]
# ignore-init-module-imports = false
# ------ - ------
@ -72,12 +72,8 @@ httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor]
develop = true
git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'aio_abandons'
# path = "../tractor"
tractor = {path = "../tractor", develop = true}
websockets = "12.0"
[tool.poetry.dependencies.asyncvnc]
git = 'https://github.com/pikers/asyncvnc.git'
branch = 'main'