Compare commits

..

2 Commits

Author SHA1 Message Date
Tyler Goodlet 134889b199 Add `pyperclip` as dev dep for `xonsh` clipboard interop 2025-02-21 16:02:12 -05:00
Tyler Goodlet 393723375c Add `visidata` as a dev-dep for introspecting time-series files 2025-02-18 18:30:58 -05:00
5 changed files with 97 additions and 241 deletions

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError: except HandshakeError:
log.exception('Retrying connection') log.exception(f'Retrying connection')
# ws & nursery block ends # ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing of msgs
of msgs over a `NoBsWs`. over a NoBsWs.
''' '''
@ -377,82 +377,43 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'), request_type: Optional[type] = None,
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm request_hook: Optional[Callable] = None,
# and options mkts are generally "slow moving".. error_hook: Optional[Callable] = None,
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as n,
open_autorecon_ws( open_autorecon_ws(url) as ws
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc( async def json_rpc(method: str, params: dict) -> dict:
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
''' '''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': req_id, 'id': next(rpc_id),
'method': method, 'method': method,
'params': params 'params': params
} }
_id = msg['id'] _id = msg['id']
result = rpc_results[_id] = { rpc_results[_id] = {
'result': None, 'result': None,
'error': None, 'event': trio.Event()
'event': trio.Event(), # signal caller resp arrived
} }
req_msgs[_id] = msg
await ws.send_msg(msg) await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait() await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']): ret = rpc_results[_id]['result']
ret = maybe_result
del rpc_results[_id]
else: del rpc_results[_id]
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None: if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4)) raise Exception(json.dumps(ret.error, indent=4))
@ -467,7 +428,6 @@ async def open_jsonrpc_session(
the server side. the server side.
''' '''
nonlocal req_msgs
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
@ -491,28 +451,19 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
# retreive orig request msg, set error log.warning(f'Recieved\n{error}')
# response in original "result" msg, if error_hook:
# THEN FINALLY set the event to signal caller await error_hook(response_type(**msg))
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
tn.start_soon(recv_task) n.start_soon(recv_task)
yield json_rpc yield json_rpc
tn.cancel_scope.cancel() n.cancel_scope.cancel()

View File

@ -386,8 +386,6 @@ def ldshm(
open_annot_ctl() as actl, open_annot_ctl() as actl,
): ):
shm_df: pl.DataFrame | None = None shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for ( for (
shmfile, shmfile,
shm, shm,
@ -528,17 +526,16 @@ def ldshm(
new_df, new_df,
step_gaps, step_gaps,
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
# await tractor.pause() await tractor.pause()
assert aids assert aids
tf2aids[period_s] = aids
else: else:
# allow interaction even when no ts problems. # allow interaction even when no ts problems.
assert not diff await tractor.pause()
# assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None: if shm_df is None:
log.error( log.error(

View File

@ -161,13 +161,7 @@ class NativeStorageClient:
def index_files(self): def index_files(self):
for path in self._datadir.iterdir(): for path in self._datadir.iterdir():
if ( if path.name in {'borked', 'expired',}:
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
continue continue
key: str = path.name.rstrip('.parquet') key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and and end_dt < start_dt
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
def_frame_duration: Duration, frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: per-provider default history-durations? # TODO: drop this right and just expose the backfill
# -[ ] inside the `open_history_client()` config allow # limits inside a [storage] section in conf.toml?
# declaring the history duration limits instead of # when no tsdb "last datum" is provided, we just load
# guessing and/or applying the same limits to all? # some near-term history.
# # periods = {
# -[ ] allow declaring (default) per-provider backfill # 1: {'days': 1},
# limits inside a [storage] sub-section in conf.toml? # 60: {'days': 14},
# # }
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently # do a decently sized backfill and load it into storage.
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -430,114 +426,71 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt # 3 cases:
gap_report: str = ( # - frame in the middle of a legit venue gap
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' # - history actually began at the `last_start_dt`
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' # - some other unknown error (ib blocking the
f'last_start_dt: {orig_last_start_dt}\n\n' # history bc they don't want you seeing how they
f'bf_until: {backfill_until_dt}\n' # cucked all the tinas..)
) if dur := frame_types.get(timeframe):
# EMPTY FRAME signal with 3 (likely) causes: # decrement by a frame's worth of duration and
# # retry a few times.
# 1. range contains legit gap in venue history last_start_dt.subtract(
# 2. history actually (edge case) **began** at the seconds=dur.total_seconds()
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
gap_report += ( log.warning(
f'Decrementing `end_dt` and retrying with,\n' f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'def_frame_duration: {def_frame_duration}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
f'(new) last_start_dt: {last_start_dt}\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
) )
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable as due: except DataUnavailable:
message: str = due.args[0]
log.warning( log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n' f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'{message}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'fqme: {mkt.fqme}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way?
# TODO: backends are responsible for being correct on # ugh, what's a better way?
# this right!? # TODO: fwiw, we probably want a way to signal a throttle
# -[ ] in the `ib` case we could maybe offer some way # condition (eg. with ib) so that we can halt the
# to halt the request loop until the condition is # request loop until the condition is resolved?
# resolved or should the backend be entirely in if timeframe > 1:
# charge of solving such faults? yes, right? await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
time[0] array['time'][0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert time[-1] == next_end_dt.timestamp() diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = ( expected_frame_size_s: float = frame_size_s + timeframe
from_timestamp(array[-1]['time']) if frame_time_diff_s > expected_frame_size_s:
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
f'{timeframe}s-series {reason} detected!\n' 'GAP DETECTED:\n'
f'fqme: {mkt.fqme}\n' f'last_start_dt: {last_start_dt}\n'
f'last_start_dt: {last_start_dt}\n\n' f'diff: {diff}\n'
f'recv interval: {recv_frame_dur}\n' f'frame_time_diff_s: {frame_time_diff_s}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -612,27 +565,22 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and and write_tsdb
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
# NOTE, always drop the src asset token for # always drop the src asset token for
# non-currency-pair like market types (for now) # non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in { if mkt.dst.atype not in {
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme( col_sym_key: str = mkt.get_fqme(
delim_char='', delim_char='',
without_src=True, without_src=True,
@ -737,7 +685,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s: Duration = ( backfilled_size_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -960,8 +908,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -972,6 +918,7 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -1000,25 +947,6 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -1049,7 +977,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
def_frame_duration=def_frame_size, frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

View File

@ -616,18 +616,6 @@ def detect_price_gaps(
# ]) # ])
... ...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe( def dedupe(
src_df: pl.DataFrame, src_df: pl.DataFrame,
@ -638,6 +626,7 @@ def dedupe(
) -> tuple[ ) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped int, # len diff between input and deduped
]: ]:
@ -650,22 +639,19 @@ def dedupe(
''' '''
wdts: pl.DataFrame = with_dts(src_df) wdts: pl.DataFrame = with_dts(src_df)
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field # maybe sort on any time field
if sort: if sort:
deduped = deduped.sort(by='time') wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected! # TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg # -[ ] report in log msg
# -[ ] possibly return segment sections which were moved? # -[ ] possibly return segment sections which were moved?
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
maintain_order=True,
)
diff: int = ( diff: int = (
wdts.height wdts.height
- -