Compare commits

...

26 Commits

Author SHA1 Message Date
goodboy 5e371f1d73 Merge pull request 'jsonrpc_err_in_rent' (#41) from jsonrpc_err_in_rent into gitea_feats
Reviewed-on: #41
2025-02-21 21:21:02 +00:00
goodboy 6c221bb348 Merge pull request 'tsp_gaps: fixes for fault-less OHLCV time-series loads' (#35) from tsp_gaps into gitea_feats
Reviewed-on: #35
2025-02-21 20:46:37 +00:00
Tyler Goodlet e391c896f8 Mk jsronrpc's underlying ws timeout `float('inf')`
Since currently we're only using this IPC subsys for `deribit`, and
generally speaking we're primarly supporting options markets (which are
fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs`
on timeout since doing so normally breaks the jsron-rpc IPC session.
Without a proper `fixture` passed to `open_autorecon_ws()` (which we
should eventually implement!!) relying on a timeout-to-reset more or
less will just cause breakage issues - a proper reconnect sequence must
be implemented before using that feature.

Deats,
- expose and proxy through the `msg_recv_timeout` from
  `open_jsonrpc_session()` into the underlying `open_autorecon_ws()`
  call.
2025-02-19 17:05:13 -05:00
Tyler Goodlet 5633f5614d Doc-n-clean `.data._web_bs.open_jsonrpc_session()`
Add a doc-string reflecting recent refinements, drop all the old hook
params, rename `n: trio.Nursery` -> `tn` for "task nursery" fitting with
code base's naming style.
2025-02-19 17:05:13 -05:00
Tyler Goodlet 76735189de data._web_bs: try to raise jsonrpc errors in parent task 2025-02-19 17:05:13 -05:00
Tyler Goodlet d49608f74e Refine history gap/termination signalling
Namely handling backends which do not provide a default "frame
size-duration" in their init-config by making the backfiller guess the
value based on the first frame received.

Deats,
- adjust `start_backfill()` to take a more explicit
  `def_frame_duration: Duration` expected to be unpacked from any
  backend hist init-config by the `tsdb_backfill()` caller which now
  also computes a value from the first received frame when the config
  section isn't provided.
- in `start_backfill()` we now always expect the `def_frame_duration`
  input and always decrement the query range by this value whenever
  a `NoData` is raised by the provider-backend paired with an explicit
  `log.warning()` about the handling.
- also relay any `DataUnavailable.args[0]` message from the provider
  in the handler.
- repair "gap reporting" which checks for expected frame duration vs.
  that received with much better humanized logging on the missing
  segment using `pendulum.Interval/Duration.in_words()` output.
2025-02-19 17:01:24 -05:00
Tyler Goodlet bf0ac93aa3 Only use `frame_types` if delivered during enter
The `open_history_client()` provider endpoint can *optionally*
deliver a `frame_types: dict[int, pendulum.Duration]` subsection in its
`config: dict[str, dict]` (as was implemented with the `ib` backend).
This allows the `tsp` backfilling machinery to use this "recommended
frame duration" to subtract from the `last_start_dt` any time a `NoData`
gap is signalled by the `get_hist()` call allowing gaps to be ignored
safely without missing history by knowing the next earliest dt we can
query from using the `end_dt`. However, currently all crypto$ providers
haven't implemented this feat yet..

As such only try to use the `frame_types` feature if provided when
handling `NoData` conditions inside `tsp.start_backfill()` and otherwise
raise as normal.
2025-02-19 17:01:24 -05:00
Tyler Goodlet d7179d47b0 `.tsp._anal`: add (unused) `detect_vlm_gaps()` 2025-02-19 17:01:24 -05:00
Tyler Goodlet c390e87536 `.storage.cli`: collect gap-markup-aids into `tf2aids: dict` prior to pause for introspection 2025-02-19 17:01:24 -05:00
Tyler Goodlet 5e4a6d61c7 Ignore any non-`.parquet` files under `.config/piker/nativedb/` subdir 2025-02-19 17:01:24 -05:00
Tyler Goodlet 3caaa30b03 Mask no-data pause, add perps to no-`/src`-in-fqme asset set
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to the `.start_backfill()` input literal
set since we don't expect the 'btc/usd.perp.binance' for now.
2025-02-19 17:01:24 -05:00
goodboy 1e3942fdc2 Merge pull request 'Add `ruff` to deps, bump `uv.lock`' (#32) from add_ruff_linter into gitea_feats
Reviewed-on: #32
2025-02-17 19:55:32 +00:00
Nelson Torres 49ea380503 Add new `ruff.toml` config file
Based on the default provided in their
[docs](https://docs.astral.sh/ruff/configuration/) and migrating
previous config from the prior `poetry`-verion of our `pyproject.toml`
2025-02-17 14:48:10 -05:00
Tyler Goodlet 933f169938 Add/reorg back some content from `poetry` old config 2025-02-14 13:47:02 -05:00
Nelson Torres 51337052a4 Remove legacy `poetry` config content from pyproject.toml 2025-02-14 15:01:29 -03:00
Tyler Goodlet 8abe55dcc6 Add `ruff` to deps, bump `uv.lock`
Such that we start encouraging devs to lint code they touch and
hopefully we include a pass as part of our tests/CI eventually B)

Also, mk local `tractor` install `--editable` since without it being
a locally hackable repo it's kinda pointless to install from the local
fs Xp
2025-02-13 21:20:11 -05:00
goodboy c933f2ad56 Merge pull request 'kucoin_and_binance_fix' (#9) from kucoin_and_binance_fix into gitea_feats
Reviewed-on: #9
2025-02-13 19:40:50 +00:00
Tyler Goodlet 00108010c9 Mask `pytest` detection block in `piker.config`
Seems to be some kinda super weird env bug since we moved to using
`uv`? When it triggers it also seems to cause a pretty fundamental crash
that not only breaks `tractor.devx._debug` stuff but also seems to get
us in a perma-hang state where no SIGINT or other sys sig will be able
to kill the root proc!?!?

TODO, a `gitea` issue to track so we can fix the fundamental problem as
well as transitive fault in `tractor`'s core which seems to be due to
the error taking place during a sub-actor's module import phase which
prevents the runtime from booting fully and then the proc getting stuck
in a real gnarly SIG-state..
2025-02-13 13:32:11 -05:00
Tyler Goodlet 8a4901c517 `.binance.feed`: moar type fixes, drop `rapidfuzz` 2025-02-13 12:35:41 -05:00
Tyler Goodlet d7f6a5ab63 Update to latest `KucoinMktPair` spec 2025-02-12 18:08:40 -05:00
Tyler Goodlet e0fdabf651 Use `../tractor` srcs in editable mode? 2025-02-12 15:14:30 -05:00
Tyler Goodlet cb88dfc9da `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2025-02-12 15:05:22 -05:00
Nelson Torres bb41dd6d18 Deleted settlePlan field from binance FutesPair. 2025-02-12 15:05:22 -05:00
Nelson Torres 99e90129ad Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2025-02-12 15:05:22 -05:00
Tyler Goodlet cceb7a37b9 Lel, forgot to add a `SPOT` venue for `binance`.. 2025-02-12 15:05:22 -05:00
goodboy 5382815b2d Merge pull request 'uv migration and default.nix for qt6' (#17) from uv_migration into gitea_feats
Reviewed-on: #17
2025-02-12 20:04:02 +00:00
13 changed files with 459 additions and 156 deletions

View File

@ -567,6 +567,7 @@ class Client:
) -> str:
return {
'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..?
}[pair.venue]

View File

@ -42,7 +42,6 @@ from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
@ -111,6 +110,7 @@ class AggTrade(Struct, frozen=True):
async def stream_messages(
ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here!
@ -221,6 +221,8 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
}
# TODO, why aren't frame resp `log.info()`s showing in upstream
# code?!
@acm
async def open_history_client(
mkt: MktPair,
@ -463,6 +465,8 @@ async def stream_quotes(
):
init_msgs: list[FeedInit] = []
for sym in symbols:
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
@ -511,7 +515,6 @@ async def stream_quotes(
# start streaming
async for typ, quote in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
@ -547,7 +550,7 @@ async def open_symbol_search(
)
# repack in fqme-keyed table
byfqme: dict[start, Pair] = {}
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair

View File

@ -181,7 +181,6 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT',
quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'],

View File

@ -62,7 +62,7 @@ from piker._cacheables import (
)
from piker.log import get_logger
from piker.data.validate import FeedInit
from piker.types import Struct
from piker.types import Struct # NOTE, this is already a `tractor.msg.Struct`
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
@ -98,9 +98,18 @@ class KucoinMktPair(Struct, frozen=True):
def size_tick(self) -> Decimal:
return Decimal(str(self.quoteMinSize))
callauctionFirstStageStartTime: None|float
callauctionIsEnabled: bool
callauctionPriceCeiling: float|None
callauctionPriceFloor: float|None
callauctionSecondStageStartTime: float|None
callauctionThirdStageStartTime: float|None
enableTrading: bool
feeCategory: int
feeCurrency: str
isMarginEnabled: bool
makerFeeCoefficient: float
market: str
minFunds: float
name: str
@ -110,7 +119,10 @@ class KucoinMktPair(Struct, frozen=True):
quoteIncrement: float
quoteMaxSize: float
quoteMinSize: float
st: bool
symbol: str # our bs_mktid, kucoin's internal id
takerFeeCoefficient: float
tradingStartTime: float|None
class AccountTrade(Struct, frozen=True):
@ -392,7 +404,13 @@ class Client:
pairs: dict[str, KucoinMktPair] = {}
fqmes2mktids: bidict[str, str] = bidict()
for item in entries:
pair = pairs[item['name']] = KucoinMktPair(**item)
try:
pair = pairs[item['name']] = KucoinMktPair(**item)
except TypeError as te:
raise TypeError(
'`KucoinMktPair` and reponse fields do not match ??\n'
f'{KucoinMktPair.fields_diff(item)}\n'
) from te
fqmes2mktids[
item['name'].lower().replace('-', '')
] = pair.name
@ -593,7 +611,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
'''
async with (
httpx.AsyncClient(
base_url=f'https://api.kucoin.com/api',
base_url='https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
@ -637,7 +655,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info('Starting ping task for kucoin ws connection')
log.warning('Starting ping task for kucoin ws connection')
n.start_soon(ping_server)
yield
@ -649,9 +667,14 @@ async def open_ping_task(
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, KucoinMktPair]:
) -> tuple[
MktPair,
KucoinMktPair,
]:
'''
Query for and return a `MktPair` and `KucoinMktPair`.
Query for and return both a `piker.accounting.MktPair` and
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
'''
async with open_cached_client('kucoin') as client:
@ -726,6 +749,8 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str)
init_msgs.append(
FeedInit(mkt_info=mkt)
@ -733,7 +758,11 @@ async def stream_quotes(
ws: NoBsWs
token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4())
log.info('API reported ping_interval: {ping_interval}\n')
connect_id: str = str(uuid4())
typ: str
quote: dict
async with (
open_autorecon_ws(
(
@ -747,20 +776,37 @@ async def stream_quotes(
),
) as ws,
open_ping_task(ws, ping_interval, connect_id),
aclosing(stream_messages(ws, sym_str)) as msg_gen,
aclosing(
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
):
typ, quote = await anext(msg_gen)
typ, quote = await anext(iter_quotes)
while typ != 'trade':
# take care to not unblock here until we get a real
# trade quote
typ, quote = await anext(msg_gen)
# take care to not unblock here until we get a real
# trade quote?
# ^TODO, remove this right?
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote))
feed_is_live.set()
async for typ, msg in msg_gen:
await send_chan.send({sym_str: msg})
# XXX NOTE, DO NOT include the `.<backend>` suffix!
# OW the sampling loop will not broadcast correctly..
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm
@ -815,7 +861,7 @@ async def subscribe(
)
async def stream_messages(
async def iter_normed_quotes(
ws: NoBsWs,
sym: str,
@ -846,6 +892,9 @@ async def stream_messages(
yield 'trade', {
'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price,
'brokerd_ts': last_trade_ts,
'ticks': [
@ -938,7 +987,7 @@ async def open_history_client(
if end_dt is None:
inow = round(time.time())
print(
log.debug(
f'difference in time between load and processing'
f'{inow - times[-1]}'
)

View File

@ -104,14 +104,15 @@ def get_app_dir(
# `tractor`) with the testing dir and check for it whenever we
# detect `pytest` is being used (which it isn't under normal
# operation).
if "pytest" in sys.modules:
import tractor
actor = tractor.current_actor(err_on_no_runtime=False)
if actor: # runtime is up
rvs = tractor._state._runtime_vars
testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
assert testdirpath.exists(), 'piker test harness might be borked!?'
app_name = str(testdirpath)
# if "pytest" in sys.modules:
# import tractor
# actor = tractor.current_actor(err_on_no_runtime=False)
# if actor: # runtime is up
# rvs = tractor._state._runtime_vars
# import pdbp; pdbp.set_trace()
# testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
# assert testdirpath.exists(), 'piker test harness might be borked!?'
# app_name = str(testdirpath)
if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA"

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception(f'Retrying connection')
log.exception('Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`.
'''
@ -377,43 +377,82 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
trio.open_nursery() as tn,
open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
):
rpc_id: Iterable = count(start_id)
rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
async def json_rpc(
method: str,
params: dict,
) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': next(rpc_id),
'id': req_id,
'method': method,
'params': params
}
_id = msg['id']
rpc_results[_id] = {
result = rpc_results[_id] = {
'result': None,
'event': trio.Event()
'error': None,
'event': trio.Event(), # signal caller resp arrived
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result']
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +467,7 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -451,19 +491,28 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case {
'error': error
}:
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task)
tn.start_soon(recv_task)
yield json_rpc
n.cancel_scope.cancel()
tn.cancel_scope.cancel()

View File

@ -386,6 +386,8 @@ def ldshm(
open_annot_ctl() as actl,
):
shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for (
shmfile,
shm,
@ -526,16 +528,17 @@ def ldshm(
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
await tractor.pause()
# await tractor.pause()
assert aids
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
await tractor.pause()
# assert not diff
assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None:
log.error(

View File

@ -161,7 +161,13 @@ class NativeStorageClient:
def index_files(self):
for path in self._datadir.iterdir():
if path.name in {'borked', 'expired',}:
if (
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
continue
key: str = path.name.rstrip('.parquet')

View File

@ -44,8 +44,10 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
Interval,
DateTime,
Duration,
duration as mk_duration,
from_timestamp,
)
import numpy as np
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
and
end_dt < start_dt
):
await tractor.pause()
break
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
raise
null_segs_detected.set()
# RECHECK for more null-gaps
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
async def start_backfill(
get_hist,
frame_types: dict[str, Duration] | None,
def_frame_duration: Duration,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -379,22 +383,23 @@ async def start_backfill(
update_start_on_prepend: bool = False
if backfill_until_dt is None:
# TODO: drop this right and just expose the backfill
# limits inside a [storage] section in conf.toml?
# when no tsdb "last datum" is provided, we just load
# some near-term history.
# periods = {
# 1: {'days': 1},
# 60: {'days': 14},
# }
# do a decently sized backfill and load it into storage.
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
# declaring the history duration limits instead of
# guessing and/or applying the same limits to all?
#
# -[ ] allow declaring (default) per-provider backfill
# limits inside a [storage] sub-section in conf.toml?
#
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
periods = {
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend = True
update_start_on_prepend: bool = True
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
@ -416,7 +421,6 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
)
try:
(
array,
@ -426,71 +430,114 @@ async def start_backfill(
timeframe,
end_dt=last_start_dt,
)
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
f'last_start_dt: {orig_last_start_dt}\n\n'
f'bf_until: {backfill_until_dt}\n'
)
# EMPTY FRAME signal with 3 (likely) causes:
#
# 1. range contains legit gap in venue history
# 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
# broker says there never was or is no more history to pull
except DataUnavailable as due:
message: str = due.args[0]
log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n'
f'{message}\n'
f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
)
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
return
time: np.ndarray = array['time']
assert (
array['time'][0]
time[0]
==
next_start_dt.timestamp()
)
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
recv_frame_dur: Duration = (
from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning(
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
f'{timeframe}s-series {reason} detected!\n'
f'fqme: {mkt.fqme}\n'
f'last_start_dt: {last_start_dt}\n\n'
f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
)
# await tractor.pause()
to_push = diff_history(
array,
@ -565,22 +612,27 @@ async def start_backfill(
# long-term storage.
if (
storage is not None
and write_tsdb
and
write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}'
)
# always drop the src asset token for
# NOTE, always drop the src asset token for
# non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in {
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme(
delim_char='',
without_src=True,
@ -685,7 +737,7 @@ async def back_load_from_tsdb(
last_tsdb_dt
and latest_start_dt
):
backfilled_size_s = (
backfilled_size_s: Duration = (
latest_start_dt - last_tsdb_dt
).seconds
# if the shm buffer len is not large enough to contain
@ -908,6 +960,8 @@ async def tsdb_backfill(
f'{pformat(config)}\n'
)
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
@ -918,7 +972,6 @@ async def tsdb_backfill(
timeframe,
config,
)
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
@ -947,6 +1000,25 @@ async def tsdb_backfill(
mr_end_dt,
) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
@ -977,7 +1049,7 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
frame_types=config.get('frame_types', None),
def_frame_duration=def_frame_size,
mod=mod,
mkt=mkt,
shm=shm,

View File

@ -616,6 +616,18 @@ def detect_price_gaps(
# ])
...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe(
src_df: pl.DataFrame,
@ -626,7 +638,6 @@ def dedupe(
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped
]:
@ -639,19 +650,22 @@ def dedupe(
'''
wdts: pl.DataFrame = with_dts(src_df)
# maybe sort on any time field
if sort:
wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field
if sort:
deduped = deduped.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
diff: int = (
wdts.height
-

View File

@ -18,22 +18,6 @@
requires = ["hatchling"]
build-backend = "hatchling.build"
# ------ - ------
[tool.ruff.lint]
# https://docs.astral.sh/ruff/settings/#lint_ignore
ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# ignore-init-module-imports = false
# ------ - ------
[project]
name = "piker"
version = "0.1.0a0dev0"
@ -103,15 +87,23 @@ uis = [
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
# ------ - ------
# for consideration,
# - 'visidata'
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# [project.optional-dependencies]
]
[dependency-groups]
# TODO: a toolset that makes debugging a `pikerd` service (tree) easy
# to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
#
# console ehancements and eventually remote debugging extras/helpers.
# use `uv --dev` to enable
dev = [
"pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0",
@ -119,13 +111,7 @@ dev = [
"prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
"ruff>=0.9.6",
]
[project.scripts]
@ -144,4 +130,4 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor" }
tractor = { path = "../tractor", editable = true }

93
ruff.toml 100644
View File

@ -0,0 +1,93 @@
# from default `ruff.toml` @
# https://docs.astral.sh/ruff/configuration/
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.9
target-version = "py312"
# ------ - ------
# TODO, stop warnings around `anext()` builtin use?
# tool.ruff.target-version = "py310"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = ["E4", "E7", "E9", "F"]
ignore = []
ignore-init-module-imports = false
[lint.per-file-ignores]
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Use single quotes in `ruff format`.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

31
uv.lock
View File

@ -708,6 +708,7 @@ dev = [
{ name = "greenback" },
{ name = "prompt-toolkit" },
{ name = "pytest" },
{ name = "ruff" },
{ name = "xonsh" },
]
@ -739,7 +740,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", directory = "../tractor" },
{ name = "tractor", editable = "../tractor" },
{ name = "trio", specifier = ">=0.24,<0.25" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
{ name = "trio-websocket", specifier = ">=0.10.3,<0.11.0" },
@ -754,6 +755,7 @@ dev = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pytest", specifier = ">=6.0.0,<7.0.0" },
{ name = "ruff", specifier = ">=0.9.6" },
{ name = "xonsh", specifier = ">=0.14.2,<0.15.0" },
]
@ -1073,6 +1075,31 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/19/71/39c7c0d87f8d4e6c020a393182060eaefeeae6c01dab6a84ec346f2567df/rich-13.9.4-py3-none-any.whl", hash = "sha256:6049d5e6ec054bf2779ab3358186963bac2ea89175919d699e378b99738c2a90", size = 242424 },
]
[[package]]
name = "ruff"
version = "0.9.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/e1/e265aba384343dd8ddd3083f5e33536cd17e1566c41453a5517b5dd443be/ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9", size = 3639454 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/76/e3/3d2c022e687e18cf5d93d6bfa2722d46afc64eaa438c7fbbdd603b3597be/ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba", size = 11714128 },
{ url = "https://files.pythonhosted.org/packages/e1/22/aff073b70f95c052e5c58153cba735748c9e70107a77d03420d7850710a0/ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504", size = 11682539 },
{ url = "https://files.pythonhosted.org/packages/75/a7/f5b7390afd98a7918582a3d256cd3e78ba0a26165a467c1820084587cbf9/ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83", size = 11132512 },
{ url = "https://files.pythonhosted.org/packages/a6/e3/45de13ef65047fea2e33f7e573d848206e15c715e5cd56095589a7733d04/ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc", size = 11929275 },
{ url = "https://files.pythonhosted.org/packages/7d/f2/23d04cd6c43b2e641ab961ade8d0b5edb212ecebd112506188c91f2a6e6c/ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b", size = 11466502 },
{ url = "https://files.pythonhosted.org/packages/b5/6f/3a8cf166f2d7f1627dd2201e6cbc4cb81f8b7d58099348f0c1ff7b733792/ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e", size = 12676364 },
{ url = "https://files.pythonhosted.org/packages/f5/c4/db52e2189983c70114ff2b7e3997e48c8318af44fe83e1ce9517570a50c6/ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666", size = 13335518 },
{ url = "https://files.pythonhosted.org/packages/66/44/545f8a4d136830f08f4d24324e7db957c5374bf3a3f7a6c0bc7be4623a37/ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5", size = 12823287 },
{ url = "https://files.pythonhosted.org/packages/c5/26/8208ef9ee7431032c143649a9967c3ae1aae4257d95e6f8519f07309aa66/ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5", size = 14592374 },
{ url = "https://files.pythonhosted.org/packages/31/70/e917781e55ff39c5b5208bda384fd397ffd76605e68544d71a7e40944945/ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217", size = 12500173 },
{ url = "https://files.pythonhosted.org/packages/84/f5/e4ddee07660f5a9622a9c2b639afd8f3104988dc4f6ba0b73ffacffa9a8c/ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6", size = 11906555 },
{ url = "https://files.pythonhosted.org/packages/f1/2b/6ff2fe383667075eef8656b9892e73dd9b119b5e3add51298628b87f6429/ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897", size = 11538958 },
{ url = "https://files.pythonhosted.org/packages/3c/db/98e59e90de45d1eb46649151c10a062d5707b5b7f76f64eb1e29edf6ebb1/ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08", size = 12117247 },
{ url = "https://files.pythonhosted.org/packages/ec/bc/54e38f6d219013a9204a5a2015c09e7a8c36cedcd50a4b01ac69a550b9d9/ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656", size = 12554647 },
{ url = "https://files.pythonhosted.org/packages/a5/7d/7b461ab0e2404293c0627125bb70ac642c2e8d55bf590f6fce85f508f1b2/ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d", size = 9949214 },
{ url = "https://files.pythonhosted.org/packages/ee/30/c3cee10f915ed75a5c29c1e57311282d1a15855551a64795c1b2bbe5cf37/ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa", size = 10999914 },
{ url = "https://files.pythonhosted.org/packages/e8/a8/d71f44b93e3aa86ae232af1f2126ca7b95c0f515ec135462b3e1f351441c/ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a", size = 10177499 },
]
[[package]]
name = "shellingham"
version = "1.5.4"
@ -1188,7 +1215,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]]
name = "tractor"
version = "0.1.0a6.dev0"
source = { directory = "../tractor" }
source = { editable = "../tractor" }
dependencies = [
{ name = "colorlog" },
{ name = "msgspec" },