Compare commits

..

2 Commits

Author SHA1 Message Date
Tyler Goodlet 134889b199 Add `pyperclip` as dev dep for `xonsh` clipboard interop 2025-02-21 16:02:12 -05:00
Tyler Goodlet 393723375c Add `visidata` as a dev-dep for introspecting time-series files 2025-02-18 18:30:58 -05:00
5 changed files with 97 additions and 241 deletions

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception('Retrying connection')
log.exception(f'Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`.
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
'''
@ -377,82 +377,43 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as tn,
open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable[int] = count(start_id)
rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(
method: str,
params: dict,
) -> dict:
async def json_rpc(method: str, params: dict) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': req_id,
'id': next(rpc_id),
'method': method,
'params': params
}
_id = msg['id']
result = rpc_results[_id] = {
rpc_results[_id] = {
'result': None,
'error': None,
'event': trio.Event(), # signal caller resp arrived
'event': trio.Event()
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
ret = rpc_results[_id]['result']
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
del rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -467,7 +428,6 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -491,28 +451,19 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case {
'error': error
}:
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
tn.start_soon(recv_task)
n.start_soon(recv_task)
yield json_rpc
tn.cancel_scope.cancel()
n.cancel_scope.cancel()

View File

@ -386,8 +386,6 @@ def ldshm(
open_annot_ctl() as actl,
):
shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for (
shmfile,
shm,
@ -528,17 +526,16 @@ def ldshm(
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
# await tractor.pause()
await tractor.pause()
assert aids
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
assert not diff
await tractor.pause()
# assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None:
log.error(

View File

@ -161,13 +161,7 @@ class NativeStorageClient:
def index_files(self):
for path in self._datadir.iterdir():
if (
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
if path.name in {'borked', 'expired',}:
continue
key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
Interval,
DateTime,
Duration,
duration as mk_duration,
from_timestamp,
)
import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling?
if (
start_dt
and
end_dt < start_dt
and end_dt < start_dt
):
await tractor.pause()
break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
raise
null_segs_detected.set()
# RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill(
get_hist,
def_frame_duration: Duration,
frame_types: dict[str, Duration] | None,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False
if backfill_until_dt is None:
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
# declaring the history duration limits instead of
# guessing and/or applying the same limits to all?
#
# -[ ] allow declaring (default) per-provider backfill
# limits inside a [storage] sub-section in conf.toml?
#
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
# TODO: drop this right and just expose the backfill
# limits inside a [storage] section in conf.toml?
# when no tsdb "last datum" is provided, we just load
# some near-term history.
# periods = {
# 1: {'days': 1},
# 60: {'days': 14},
# }
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True
update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
)
try:
(
array,
@ -430,114 +426,71 @@ async def start_backfill(
timeframe,
end_dt=last_start_dt,
)
except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
f'last_start_dt: {orig_last_start_dt}\n\n'
f'bf_until: {backfill_until_dt}\n'
)
# EMPTY FRAME signal with 3 (likely) causes:
#
# 1. range contains legit gap in venue history
# 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
)
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull
except DataUnavailable as due:
message: str = due.args[0]
except DataUnavailable:
log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n'
f'{message}\n'
f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
return
time: np.ndarray = array['time']
assert (
time[0]
array['time'][0]
==
next_start_dt.timestamp()
)
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = (
from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning(
f'{timeframe}s-series {reason} detected!\n'
f'fqme: {mkt.fqme}\n'
f'last_start_dt: {last_start_dt}\n\n'
f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
)
# await tractor.pause()
to_push = diff_history(
array,
@ -612,27 +565,22 @@ async def start_backfill(
# long-term storage.
if (
storage is not None
and
write_tsdb
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}'
)
# NOTE, always drop the src asset token for
# always drop the src asset token for
# non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in {
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme(
delim_char='',
without_src=True,
@ -737,7 +685,7 @@ async def back_load_from_tsdb(
last_tsdb_dt
and latest_start_dt
):
backfilled_size_s: Duration = (
backfilled_size_s = (
latest_start_dt - last_tsdb_dt
).seconds
# if the shm buffer len is not large enough to contain
@ -960,8 +908,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n'
)
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
@ -972,6 +918,7 @@ async def tsdb_backfill(
timeframe,
config,
)
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
@ -1000,25 +947,6 @@ async def tsdb_backfill(
mr_end_dt,
) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
@ -1049,7 +977,7 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
def_frame_duration=def_frame_size,
frame_types=config.get('frame_types', None),
mod=mod,
mkt=mkt,
shm=shm,

View File

@ -616,18 +616,6 @@ def detect_price_gaps(
# ])
...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe(
src_df: pl.DataFrame,
@ -638,6 +626,7 @@ def dedupe(
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped
]:
@ -650,22 +639,19 @@ def dedupe(
'''
wdts: pl.DataFrame = with_dts(src_df)
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field
if sort:
deduped = deduped.sort(by='time')
wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
maintain_order=True,
)
diff: int = (
wdts.height
-