Compare commits

..

5 Commits

Author SHA1 Message Date
Nelson Torres ffbca17ba2 Deribit's feed fix
- `FeedInit` for init_msgs in `stream_quotes`.

- new cache is `client_pairs` so is replacing the old `client.cache_symbols`.

- `get_mkt_info` added

- `get_ohlc` fixed to comply the new ways of the feed.
2025-01-29 19:24:57 -03:00
Nelson Torres 75891fcac9 Deribit's api fix
key changes:

- Resolved the issue with the expiration dates from deribits, now we int instead of the crazy custom deribits format.

- The client now has a new  `_json_rpc_auth_wrapper` that adquires a first access token and then will refresh the access token when this expires.

- `get_assets` fixed, now  we use the public endpoint to check the availables assets, in the future probably this will change, but for now is working just fine.

- `get_mkt_pairs` added.

- `exch_info` added.

- `cache_symbols` fixed.

- Also a lot of reformat made in api.
2025-01-29 19:24:57 -03:00
Nelson Torres 3fb71e9b93 Venues
Moved from api to venues all the msgspecs structs, also added critical imports in api, feed and __init__ mods.
2025-01-29 19:24:57 -03:00
Nelson Torres 3b90f9c5e8 New deps in the `pyproject.toml`:
- `cryptofeed` 2.4.0
- `pyarrow` 17.0.0
- `poetry.lock` updated too.
2025-01-29 19:24:57 -03:00
Tyler Goodlet 91398b7b4f data._web_bs: try to raise jsonrpc errors in parent task 2025-01-29 19:16:55 -03:00
11 changed files with 975 additions and 1677 deletions

View File

@ -28,8 +28,6 @@ from decimal import (
Decimal, Decimal,
) )
from functools import partial from functools import partial
from pathlib import Path
from pprint import pformat
import time import time
from typing import ( from typing import (
Any, Any,
@ -39,6 +37,8 @@ from typing import (
from pendulum import now from pendulum import now
import trio import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
@ -55,11 +55,8 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
from cryptofeed.types import ( # types for managing the cb callbacks.
L1Book, # from cryptofeed.types import L1Book
Trade,
)
from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
MarketType, MarketType,
@ -67,7 +64,9 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
JSONRPCChannel,
KLinesResult, KLinesResult,
Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -78,7 +77,7 @@ from piker.accounting import (
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
# Struct, Struct,
) )
from piker.data._web_bs import ( from piker.data._web_bs import (
open_jsonrpc_session open_jsonrpc_session
@ -97,17 +96,9 @@ _spawn_kwargs = {
} }
def deribit_timestamp(when: datetime) -> int: # convert datetime obj timestamp to unixtime in milliseconds
''' def deribit_timestamp(when):
Convert conventional epoch timestamp, in secs, to unixtime in return int((when.timestamp() * 1000) + (when.microsecond / 1000))
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def str_to_cb_sym(name: str) -> Symbol: def str_to_cb_sym(name: str) -> Symbol:
@ -130,20 +121,14 @@ def str_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date expiry_date=new_expiry_date)
)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
( base, expiry_date, strike_price, option_type = tuple(
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-')) name.upper().split('-'))
quote: str = base quote = base
if option_type == 'P': if option_type == 'P':
option_type = PUT option_type = PUT
@ -158,32 +143,14 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date expiry_date=expiry_date)
)
# TODO, instead can't we just lookup the `MktPair` directly def cb_sym_to_deribit_inst(sym: Symbol):
# and pass it upward to `stream_quotes()`??
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = ( otype = 'C' if sym.option_type == CALL else 'P'
'C' if sym.option_type == CALL
else 'P' return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str: def get_values_from_cb_normalized_date(expiry_date: str) -> str:
@ -212,18 +179,16 @@ def get_config() -> dict[str, Any]:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict|None = conf.get('deribit') section: dict = {}
section = conf.get('deribit')
if section is None: if section is None:
raise ValueError( log.warning(f'No config section found for deribit in {path}')
f'No `[deribit]` section found in\n' return {}
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {}) conf_option = section.get('option', {})
section.clear # clear the dict to reuse it section.clear # clear the dict to reuse it
@ -239,10 +204,7 @@ def get_config() -> dict[str, Any]:
class Client: class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__( def __init__(
self, self,
@ -261,12 +223,8 @@ class Client:
self._auth_ts = None self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper( async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will """Background task that adquires a first access token and then will
refresh the access token. refresh the access token.
@ -292,6 +250,9 @@ class Client:
return await self.json_rpc(*args, **kwargs) return await self.json_rpc(*args, **kwargs)
async def get_balances( async def get_balances(
self, self,
kind: str = 'option' kind: str = 'option'
@ -316,29 +277,23 @@ class Client:
venue: str | None = None, venue: str | None = None,
) -> dict[str, Asset]: ) -> dict[str, Asset]:
''' """Return the set of asset balances for this account
Return the set of asset balances for this account by symbol.
by (deribit's) symbol. """
'''
assets = {} assets = {}
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
'public/get_currencies', 'public/get_currencies',
params={} params={}
) )
currencies: list[dict] = resp.result currencies = resp.result
for currency in currencies: for currency in currencies:
name: str = currency['currency'] name = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision']) tx_tick = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset( assets[name] = Asset(
name=name, name=name,
atype='crypto_currency', atype=atype,
tx_tick=tx_tick, tx_tick=tx_tick)
)
instruments = await self.symbol_info(currency=name) instruments = await self.symbol_info(currency=name)
for instrument in instruments: for instrument in instruments:
@ -346,10 +301,9 @@ class Client:
assets[pair.symbol] = Asset( assets[pair.symbol] = Asset(
name=pair.symbol, name=pair.symbol,
atype=pair.venue, atype=pair.venue,
tx_tick=pair.size_tick, tx_tick=pair.size_tick)
)
return assets return assets
async def get_mkt_pairs(self) -> dict[str, Pair]: async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {} flat: dict[str, Pair] = {}
@ -404,19 +358,6 @@ class Client:
return cached_pair return cached_pair
if sym: if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym] return pair_table[sym]
else: else:
return self._pairs return self._pairs
@ -440,7 +381,7 @@ class Client:
params: dict[str, str] = { params: dict[str, str] = {
'currency': currency.upper(), 'currency': currency.upper(),
'kind': kind, 'kind': kind,
'expired': expired, 'expired': str(expired).lower()
} }
resp: JSONRPCResult = await self._json_rpc_auth_wrapper( resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -448,9 +389,9 @@ class Client:
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Pair = PAIRTYPES[kind] pair_type: Type = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {} instruments: dict[str, Pair] = {}
for item in results: for item in results:
symbol=item['instrument_name'].lower() symbol=item['instrument_name'].lower()
@ -486,15 +427,12 @@ class Client:
mkt_pairs = await self.symbol_info() mkt_pairs = await self.symbol_info()
if not mkt_pairs: if not mkt_pairs:
raise SymbolNotFound( raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {} pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs: for instrument in mkt_pairs:
pair_type: Pair|OptionPair = PAIRTYPES[venue] pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -542,14 +480,12 @@ class Client:
if end_dt is None: if end_dt is None:
end_dt = now('UTC') end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
'minute' 'minute').subtract(minutes=limit)
).subtract(minutes=limit)
start_time: int = deribit_timestamp(start_dt) start_time = deribit_timestamp(start_dt)
end_time: int = deribit_timestamp(end_dt) end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
@ -563,13 +499,9 @@ class Client:
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars: list[tuple] = [] new_bars: list[tuple] = []
# if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
for i in range(len(result.close)): for i in range(len(result.close)):
row = [
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
@ -622,7 +554,7 @@ async def get_client(
@acm @acm
async def open_feed_handler() -> FeedHandler: async def open_feed_handler():
fh = FeedHandler(config=get_config()) fh = FeedHandler(config=get_config())
yield fh yield fh
await to_asyncio.run_task(fh.stop_async) await to_asyncio.run_task(fh.stop_async)
@ -643,37 +575,43 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
''' async def _trade(data: dict, receipt_timestamp):
Relay price feed quotes from the `cryptofeed.FeedHandler` to to_trio.send_nowait(('trade', {
the `piker`-side `trio.task` consumers for delivery to consumer 'symbol': cb_sym_to_deribit_inst(
sub-actors for various subsystems. str_to_cb_sym(data.symbol)).lower(),
'last': data,
''' 'broker_ts': time.time(),
async def _trade( 'data': data.to_dict(),
trade: Trade, # cryptofeed, NOT ours from `.venues`! 'receipt': receipt_timestamp
receipt_timestamp: int, }))
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
async def _l1(
book: L1Book,
receipt_timestamp: int,
) -> None:
'''
Relay-thru "l1 book" updates.
'''
to_trio.send_nowait(('l1', book))
# TODO, make this work!
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
# breakpoint()
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
}))
sym: Symbol = piker_sym_to_cb_sym(instrument) sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
@ -687,35 +625,27 @@ async def aio_price_feed_relay(
if not fh.running: if not fh.running:
fh.run( fh.run(
start_loop=False, start_loop=False,
install_signal_handlers=False install_signal_handlers=False)
)
# sync with trio # sync with trio
to_trio.send_nowait(None) to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str instrument: str
) -> to_asyncio.LinkedTaskChannel: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
fh: FeedHandler async with to_asyncio.open_channel_from(
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial( partial(
aio_price_feed_relay, aio_price_feed_relay,
fh, fh,
instrument instrument
) )
) as (first, chan) ) as (first, chan):
): yield chan
yield chan
@acm @acm
@ -724,7 +654,6 @@ async def maybe_open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={
@ -739,69 +668,68 @@ async def maybe_open_price_feed(
# TODO, move all to `.broker` submod! async def aio_order_feed_relay(
# async def aio_order_feed_relay( fh: FeedHandler,
# fh: FeedHandler, instrument: Symbol,
# instrument: Symbol, from_trio: asyncio.Queue,
# from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel,
# to_trio: trio.abc.SendChannel, ) -> None:
# ) -> None: async def _fill(data: dict, receipt_timestamp):
# async def _fill(data: dict, receipt_timestamp): breakpoint()
# breakpoint()
# async def _order_info(data: dict, receipt_timestamp): async def _order_info(data: dict, receipt_timestamp):
# breakpoint() breakpoint()
# fh.add_feed( fh.add_feed(
# DERIBIT, DERIBIT,
# channels=[FILLS, ORDER_INFO], channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()], symbols=[instrument.upper()],
# callbacks={ callbacks={
# FILLS: _fill, FILLS: _fill,
# ORDER_INFO: _order_info, ORDER_INFO: _order_info,
# }) })
# if not fh.running: if not fh.running:
# fh.run( fh.run(
# start_loop=False, start_loop=False,
# install_signal_handlers=False) install_signal_handlers=False)
# # sync with trio # sync with trio
# to_trio.send_nowait(None) to_trio.send_nowait(None)
# await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
# @acm @acm
# async def open_order_feed( async def open_order_feed(
# instrument: list[str] instrument: list[str]
# ) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
# partial( partial(
# aio_order_feed_relay, aio_order_feed_relay,
# fh, fh,
# instrument instrument
# ) )
# ) as (first, chan): ) as (first, chan):
# yield chan yield chan
# @acm @acm
# async def maybe_open_order_feed( async def maybe_open_order_feed(
# instrument: str instrument: str
# ) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# # TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
# async with maybe_open_context( async with maybe_open_context(
# acm_func=open_order_feed, acm_func=open_order_feed,
# kwargs={ kwargs={
# 'instrument': instrument.split('.')[0], 'instrument': instrument.split('.')[0],
# 'fh': fh 'fh': fh
# }, },
# key=f'{instrument.split('.')[0]}-order', key=f'{instrument.split('.')[0]}-order',
# ) as (cache_hit, feed): ) as (cache_hit, feed):
# if cache_hit: if cache_hit:
# yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)
# else: else:
# yield feed yield feed

View File

@ -18,58 +18,56 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import ( from typing import Any, Optional, Callable
# Any, from pprint import pformat
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import cryptofeed
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
now,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.accounting import (
Asset,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
) )
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
DataUnavailable,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
) )
from piker.log import ( from piker.log import get_logger, get_console_log
get_logger, from piker.data import ShmArray
mk_repr,
)
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Client, Trade,
# get_config, get_config,
piker_sym_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
Pair, Pair,
OptionPair, OptionPair,
Trade,
) )
_spawn_kwargs = { _spawn_kwargs = {
@ -88,10 +86,6 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -111,31 +105,6 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData( raise NoData(
f'No frame for {start_dt} -> {end_dt}\n' f'No frame for {start_dt} -> {end_dt}\n'
) )
@ -157,20 +126,14 @@ async def open_history_client(
return array, start_dt, end_dt return array, start_dt, end_dt
yield ( yield get_ohlc, {'erlangs': 3, 'rate': 3}
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache() @async_lifo_cache()
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None: ) -> tuple[MktPair, Pair] | None:
# uppercase since kraken bs_mktid is always upper # uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower(): if 'deribit' not in fqme.lower():
@ -186,7 +149,7 @@ async def get_mkt_info(
# returns, always! # returns, always!
expiry: str = expiry.upper() expiry: str = expiry.upper()
venue: str = venue.upper() venue: str = venue.upper()
# venue_lower: str = venue.lower() venue_lower: str = venue.lower()
mkt_mode: str = 'option' mkt_mode: str = 'option'
@ -212,88 +175,64 @@ async def get_mkt_info(
price_tick=pair.price_tick, price_tick=pair.price_tick,
size_tick=pair.size_tick, size_tick=pair.size_tick,
bs_mktid=pair.symbol, bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode, venue=mkt_mode,
broker='deribit', broker='deribit',
_atype=mkt_mode, _atype=mkt_mode,
_fqme_without_src=True, _fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
) )
return mkt, pair return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' # XXX: required to propagate ``tractor`` loglevel to piker logging
Open a live quote stream for the market set defined by `symbols`. get_console_log(loglevel or tractor.current_actor().loglevel)
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are init_msgs: list[FeedInit] = []
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
init_msgs.append( init_msgs.append(
FeedInit( FeedInit(mkt_info=mkt)
mkt_info=mkt,
)
) )
# build `cryptofeed` feed-handle nsym = piker_sym_to_cb_sym(sym)
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
from_cf: tractor.to_asyncio.LinkedTaskChannel async with maybe_open_price_feed(sym) as stream:
async with maybe_open_price_feed(sym) as from_cf:
# load the "last trades" summary cache = client._pairs
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
# TODO, do we even need this or will the above always last_trades = (await client.last_trades(
# work? cb_sym_to_deribit_inst(nsym), count=1)).trades
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
# else: if len(last_trades) == 0:
last_trade = Trade( last_trade = None
**(last_trades[0]) async for typ, quote in stream:
) if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
first_quote: dict = { else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym, 'symbol': sym,
'last': last_trade.price, 'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, 'brokerd_ts': last_trade.timestamp,
@ -304,84 +243,13 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp 'broker_ts': last_trade.timestamp
}] }]
} }
task_status.started(( task_status.started((init_msgs, first_quote))
init_msgs,
first_quote,
))
feed_is_live.set() feed_is_live.set()
# NOTE XXX, static for now! async for typ, quote in stream:
# => since this only handles ONE mkt feed at a time we topic = quote['symbol']
# don't need a lookup table to map interleaved quotes await send_chan.send({topic: quote})
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context @tractor.context
@ -391,13 +259,13 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
# cache = client._pairs cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within # NOTE: pattern fuzzy-matching is done within
# the methd impl. # the methd impl.
pairs: dict[str, Pair] = await client.search_symbols( pairs: dict[str, Pair] = await client.search_symbols(

View File

@ -22,10 +22,11 @@ from __future__ import annotations
import pendulum import pendulum
from typing import ( from typing import (
Literal, Literal,
Optional,
) )
from decimal import Decimal from decimal import Decimal
from msgspec import field
from piker.types import Struct from piker.types import Struct
@ -110,21 +111,18 @@ class OptionPair(Pair, frozen=True):
block_trade_min_trade_amount: int # '25' block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003' block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair' ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property @property
def expiry(self) -> str: def expiry(self) -> str:
iso_date = pendulum.from_timestamp( iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
self.expiration_timestamp / 1000
).isoformat()
return iso_date return iso_date
@property @property
def venue(self) -> str: def venue(self) -> str:
return f'{self.instrument_type}_option' return 'option'
@property @property
def bs_fqme(self) -> str: def bs_fqme(self) -> str:
@ -154,7 +152,6 @@ class JSONRPCResult(Struct):
error: Optional[dict] = None error: Optional[dict] = None
result: Optional[list[dict]] = None result: Optional[list[dict]] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
method: str method: str
params: dict params: dict
@ -171,7 +168,6 @@ class KLinesResult(Struct):
status: str status: str
volume: list[float] volume: list[float]
class Trade(Struct): class Trade(Struct):
iv: float iv: float
price: float price: float
@ -190,7 +186,6 @@ class Trade(Struct):
block_trade_id: Optional[str] = '', block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0, block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
has_more: bool has_more: bool

View File

@ -30,7 +30,6 @@ import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -55,9 +54,6 @@ from ._util import (
get_console_log, get_console_log,
) )
from ..service import maybe_spawn_daemon from ..service import maybe_spawn_daemon
from piker.log import (
mk_repr,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
@ -579,6 +575,7 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus, # noqa bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
@ -599,22 +596,11 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# print(quotes)
# XXX WARNING XXX only enable for debugging bc ow can cost # TODO: ``numba`` this!
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO: `numba` this!
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -687,18 +673,6 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
if not subs:
all_bs_fqmes: list[str] = list(
bus._subscribers.keys()
)
log.warning(
f'No subscribers for {brokername!r} live-quote ??\n'
f'broker_symbol: {broker_symbol}\n\n'
f'Maybe the backend-sys symbol does not match one of,\n'
f'{pfmt(all_bs_fqmes)}\n'
)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct

View File

@ -360,7 +360,7 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`. of msgs over a NoBsWs.
''' '''
@ -377,16 +377,6 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
# request_type: Optional[type] = None, # request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None, # request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None, # error_hook: Optional[Callable] = None,
@ -398,18 +388,12 @@ async def open_jsonrpc_session(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws( open_autorecon_ws(url) as ws
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc( async def json_rpc(method: str, params: dict) -> dict:
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
@ -499,7 +483,7 @@ async def open_jsonrpc_session(
# response in original "result" msg, # response in original "result" msg,
# THEN FINALLY set the event to signal caller # THEN FINALLY set the event to signal caller
# to raise the error in the parent task. # to raise the error in the parent task.
req_id: int = msg['id'] req_id: int = error['id']
req_msg: dict = req_msgs[req_id] req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id] result: dict = rpc_results[req_id]
result['error'] = error result['error'] = error

View File

@ -540,10 +540,7 @@ async def open_feed_bus(
# subscription since the backend isn't (yet) expected to # subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys # append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) . # which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault( bus._subscribers.setdefault(bs_fqme, set())
bs_fqme,
set(),
)
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(

View File

@ -18,11 +18,7 @@
Log like a forester! Log like a forester!
""" """
import logging import logging
import reprlib
import json import json
from typing import (
Callable,
)
import tractor import tractor
from pygments import ( from pygments import (
@ -88,27 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -161,12 +161,7 @@ class NativeStorageClient:
def index_files(self): def index_files(self):
for path in self._datadir.iterdir(): for path in self._datadir.iterdir():
if ( if path.name in {'borked', 'expired',}:
path.name in {'borked', 'expired',}
or
'.parquet' not in str(path)
):
# ignore all non-apache files (for now)
continue continue
key: str = path.name.rstrip('.parquet') key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and and end_dt < start_dt
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
def_frame_duration: Duration, frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: per-provider default history-durations? # TODO: drop this right and just expose the backfill
# -[ ] inside the `open_history_client()` config allow # limits inside a [storage] section in conf.toml?
# declaring the history duration limits instead of # when no tsdb "last datum" is provided, we just load
# guessing and/or applying the same limits to all? # some near-term history.
# # periods = {
# -[ ] allow declaring (default) per-provider backfill # 1: {'days': 1},
# limits inside a [storage] sub-section in conf.toml? # 60: {'days': 14},
# # }
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently # do a decently sized backfill and load it into storage.
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -430,58 +426,37 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt # 3 cases:
gap_report: str = ( # - frame in the middle of a legit venue gap
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' # - history actually began at the `last_start_dt`
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' # - some other unknown error (ib blocking the
f'last_start_dt: {orig_last_start_dt}\n\n' # history bc they don't want you seeing how they
f'bf_until: {backfill_until_dt}\n' # cucked all the tinas..)
) if dur := frame_types.get(timeframe):
# EMPTY FRAME signal with 3 (likely) causes: # decrement by a frame's worth of duration and
# # retry a few times.
# 1. range contains legit gap in venue history last_start_dt.subtract(
# 2. history actually (edge case) **began** at the seconds=dur.total_seconds()
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
gap_report += ( log.warning(
f'Decrementing `end_dt` and retrying with,\n' f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'def_frame_duration: {def_frame_duration}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
f'(new) last_start_dt: {last_start_dt}\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
) )
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable as due: except DataUnavailable:
message: str = due.args[0]
log.warning( log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n' f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'{message}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'fqme: {mkt.fqme}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way? # UGH: what's a better way?
# TODO: backends are responsible for being correct on # TODO: backends are responsible for being correct on
@ -490,54 +465,34 @@ async def start_backfill(
# to halt the request loop until the condition is # to halt the request loop until the condition is
# resolved or should the backend be entirely in # resolved or should the backend be entirely in
# charge of solving such faults? yes, right? # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
time[0] array['time'][0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert time[-1] == next_end_dt.timestamp() diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = ( expected_frame_size_s: float = frame_size_s + timeframe
from_timestamp(array[-1]['time']) if frame_time_diff_s > expected_frame_size_s:
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
f'{timeframe}s-series {reason} detected!\n' 'GAP DETECTED:\n'
f'fqme: {mkt.fqme}\n' f'last_start_dt: {last_start_dt}\n'
f'last_start_dt: {last_start_dt}\n\n' f'diff: {diff}\n'
f'recv interval: {recv_frame_dur}\n' f'frame_time_diff_s: {frame_time_diff_s}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -612,8 +567,7 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and and write_tsdb
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
@ -734,7 +688,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s: Duration = ( backfilled_size_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -957,8 +911,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -969,6 +921,7 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -997,25 +950,6 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -1046,7 +980,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
def_frame_duration=def_frame_size, frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

1751
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -25,11 +25,11 @@ build-backend = "poetry.core.masonry.api"
ignore = [] ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores # https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
# "piker/ui/qt.py" = [ "piker/ui/qt.py" = [
# "E402", "E402",
# 'F401', # unused imports (without __all__ or blah as blah) 'F401', # unused imports (without __all__ or blah as blah)
# # "F841", # unused variable rules # "F841", # unused variable rules
# ] ]
# ignore-init-module-imports = false # ignore-init-module-imports = false
# ------ - ------ # ------ - ------
@ -72,8 +72,12 @@ httpx = "^0.27.0"
cryptofeed = "^2.4.0" cryptofeed = "^2.4.0"
pyarrow = "^17.0.0" pyarrow = "^17.0.0"
tractor = {path = "../tractor", develop = true} [tool.poetry.dependencies.tractor]
websockets = "12.0" develop = true
git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'aio_abandons'
# path = "../tractor"
[tool.poetry.dependencies.asyncvnc] [tool.poetry.dependencies.asyncvnc]
git = 'https://github.com/pikers/asyncvnc.git' git = 'https://github.com/pikers/asyncvnc.git'
branch = 'main' branch = 'main'