Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet e2e3e30d7a Attempt to keep selected item highlighted
This attempt was unsuccessful since trying to (re)select the last
highlighted item on both an "enter" or "click" of that item causes
a hang and then segfault in `Qt`; no clue why..

Adds a `keep_current_item_selected: bool` flag to
`CompleterView.show_cache_entries()` but using it seems to always cause
a hang and crash; we keep all potential use spots commented for now
obviously to avoid this. Also included is a bunch of tidying to logic
blocks in the kb-control loop for readability.
2023-01-10 12:42:26 -05:00
Tyler Goodlet dd292b3652 Don't raise on quote feed lags to dark clearing loop 2023-01-10 12:42:26 -05:00
Tyler Goodlet 140d21c179 Lol, pull hist chart from the display state 2023-01-10 12:42:26 -05:00
Tyler Goodlet 1412c435fd Make (cache) search-results a `set` and avoid overlay duplicate entries 2023-01-10 12:42:26 -05:00
Tyler Goodlet acef3505fd Move sync log msg back to info 2023-01-10 12:42:26 -05:00
Tyler Goodlet bfeebba734 Take outer-interval values in `Viz.datums_range()` 2023-01-10 12:42:26 -05:00
Tyler Goodlet e2a299fe6c Clean a buncha cruft from render mod 2023-01-10 12:42:26 -05:00
Tyler Goodlet 9c46b92ce7 Don't deliver shms from `start_backfill()`, they're not used 2023-01-10 12:42:26 -05:00
Tyler Goodlet 9a0605e405 `deribit`: drop old `backfill_bars()` ep 2023-01-10 12:42:26 -05:00
Tyler Goodlet ae6d5b07e7 `kraken`: only do unsub if connected
Trying to send a message in the `NoBsWs.fixture()` exit when the ws is
not currently disconnected causes a double `._stack.close()` call which
will corrupt `trio`'s coro stack. Instead only do the unsub if we detect
the ws is still up.

Also drops the legacy `backfill_bars()` module endpoint.

Fixes #437
2023-01-10 12:42:26 -05:00
10 changed files with 152 additions and 148 deletions

View File

@ -94,21 +94,6 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
symbol: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
instrument = symbol
with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client:
bars = await client.bars(instrument)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,

View File

@ -303,24 +303,6 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
'''
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
@ -419,14 +401,15 @@ async def stream_quotes(
yield
# unsub from all pairs on teardown
await ws.send_msg({
'pair': list(ws_pairs.values()),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
if ws.connected():
await ws.send_msg({
'pair': list(ws_pairs.values()),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds

View File

@ -172,6 +172,7 @@ async def clear_dark_triggers(
# TODO:
# - numba all this!
# - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
async for quotes in quote_stream:
# start = time.time()
for sym, quote in quotes.items():

View File

@ -414,7 +414,9 @@ async def start_backfill(
and starts[next_start_dt] <= 6
):
start_dt = min(starts)
print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}")
log.warning(
f"{bfqsn}: skipping duplicate frame @ {next_start_dt}"
)
starts[start_dt] += 1
continue
@ -616,10 +618,7 @@ async def tsdb_backfill(
# unblock the feed bus management task
# assert len(shms[1].array)
task_status.started((
shms[60],
shms[1],
))
task_status.started()
async def back_load_from_tsdb(
timeframe: int,
@ -868,10 +867,7 @@ async def manage_history(
marketstore.open_storage_client(fqsn)as storage,
):
# TODO: drop returning the output that we pass in?
(
hist_shm,
rt_shm,
) = await bus.nursery.start(
await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,

View File

@ -21,7 +21,9 @@ core task logic for processing chains
from dataclasses import dataclass
from functools import partial
from typing import (
AsyncIterator, Callable, Optional,
AsyncIterator,
Callable,
Optional,
Union,
)
@ -386,7 +388,7 @@ async def cascade(
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.debug(f're-syncing fsp {func_name} to source')
log.info(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
@ -429,6 +431,7 @@ async def cascade(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
@ -444,7 +447,8 @@ async def cascade(
# signal
times = src.array['time']
if len(times) > 1:
delay_s = float(times[-1] - times[times != times[-1]][-1])
last_ts = times[-1]
delay_s = float(last_ts - times[times != last_ts][-1])
else:
# our default "HFT" sample rate.
delay_s = _default_delay_s

View File

@ -127,7 +127,10 @@ class GodWidget(QWidget):
# self.init_strategy_ui()
# self.vbox.addLayout(self.hbox)
self._chart_cache: dict[str, LinkedSplits] = {}
self._chart_cache: dict[
str,
tuple[LinkedSplits, LinkedSplits],
] = {}
self.hist_linked: Optional[LinkedSplits] = None
self.rt_linked: Optional[LinkedSplits] = None
@ -147,23 +150,6 @@ class GodWidget(QWidget):
def linkedsplits(self) -> LinkedSplits:
return self.rt_linked
# def init_timeframes_ui(self):
# self.tf_layout = QHBoxLayout()
# self.tf_layout.setSpacing(0)
# self.tf_layout.setContentsMargins(0, 12, 0, 0)
# time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
# btn_prefix = 'TF'
# for tf in time_frames:
# btn_name = ''.join([btn_prefix, tf])
# btn = QtWidgets.QPushButton(tf)
# # TODO:
# btn.setEnabled(False)
# setattr(self, btn_name, btn)
# self.tf_layout.addWidget(btn)
# self.toolbar_layout.addLayout(self.tf_layout)
# XXX: strat loader/saver that we don't need yet.
# def init_strategy_ui(self):
# self.strategy_box = StrategyBoxWidget(self)

View File

@ -19,6 +19,10 @@ Data vizualization APIs
'''
from __future__ import annotations
from math import (
ceil,
floor,
)
from typing import (
Optional,
Literal,
@ -456,13 +460,13 @@ class Viz(msgspec.Struct): # , frozen=True):
array = self.shm.array
index = array[index_field]
first = round(index[0])
last = round(index[-1])
first = floor(index[0])
last = ceil(index[-1])
# first and last datums in view determined by
# l / r view range.
leftmost = round(l)
rightmost = round(r)
leftmost = floor(l)
rightmost = ceil(r)
# invalid view state
if (

View File

@ -457,7 +457,7 @@ def graphics_update_cycle(
# state-tracking ``chart_maxmin()`` routine from above?
chart = ds.chart
hist_chart = ds.godwidget.hist_linked.chart
hist_chart = ds.hist_chart
flume = ds.flume
sym = flume.symbol

View File

@ -24,7 +24,6 @@ for fast incremental update.
'''
from __future__ import annotations
from typing import (
Optional,
TYPE_CHECKING,
)
@ -61,17 +60,6 @@ class Renderer(msgspec.Struct):
path: QPainterPath | None = None
fast_path: QPainterPath | None = None
# XXX: just ideas..
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
# the ``.draw()`` implementation.
# graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None
# graphics_t_shm: Optional[ShmArray] = None
# path graphics update implementation methods
# prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# downsampling state
_last_uppx: float = 0
_in_ds: bool = False
@ -218,7 +206,7 @@ class Renderer(msgspec.Struct):
):
# print(f"{self.viz.name} -> REDRAWING BRUH")
if new_sample_rate and showing_src_data:
log.info(f'DEDOWN -> {array_key}')
log.info(f'DE-downsampling -> {array_key}')
self._in_ds = False
elif should_ds and uppx > 1:
@ -269,10 +257,7 @@ class Renderer(msgspec.Struct):
append_length > 0
and do_append
):
print(f'{array_key} append len: {append_length}')
# new_x = x_1d[-append_length - 2:] # slice_to_head]
# new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path')
profiler(f'sliced append path {append_length}')
# (
# x_1d,
# y_1d,
@ -300,22 +285,23 @@ class Renderer(msgspec.Struct):
profiler('generated append qpath')
if use_fpath:
# print(f'{self.viz.name}: FAST PATH')
# an attempt at trying to make append-updates faster..
if fast_path is None:
fast_path = append_path
# fast_path.reserve(int(6e3))
else:
# print(
# f'{self.viz.name}: FAST PATH\n'
# f"append_path br: {append_path.boundingRect()}\n"
# f"path size: {size}\n"
# f"append_path len: {append_path.length()}\n"
# f"fast_path len: {fast_path.length()}\n"
# )
fast_path.connectPath(append_path)
size = fast_path.capacity()
profiler(f'connected fast path w size: {size}')
print(
f"append_path br: {append_path.boundingRect()}\n"
f"path size: {size}\n"
f"append_path len: {append_path.length()}\n"
f"fast_path len: {fast_path.length()}\n"
)
# graphics.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path)

View File

@ -144,15 +144,29 @@ class CompleterView(QTreeView):
self._font_size: int = 0 # pixels
self._init: bool = False
async def on_pressed(self, idx: QModelIndex) -> None:
async def on_pressed(
self,
idx: QModelIndex,
) -> None:
'''
Mouse pressed on view handler.
'''
search = self.parent()
await search.chart_current_item()
await search.chart_current_item(
clear_to_cache=True,
)
# XXX: this causes Qt to hang and segfault..lovely
# self.show_cache_entries(
# only=True,
# keep_current_item_selected=True,
# )
search.focus()
def set_font_size(self, size: int = 18):
# print(size)
if size < 0:
@ -288,7 +302,7 @@ class CompleterView(QTreeView):
def select_first(self) -> QStandardItem:
'''
Select the first depth >= 2 entry from the completer tree and
return it's item.
return its item.
'''
# ensure we're **not** selecting the first level parent node and
@ -615,6 +629,8 @@ class SearchWidget(QtWidgets.QWidget):
def show_cache_entries(
self,
only: bool = False,
keep_current_item_selected: bool = False,
) -> None:
'''
Clear the search results view and show only cached (aka recently
@ -624,10 +640,14 @@ class SearchWidget(QtWidgets.QWidget):
godw = self.godwidget
# first entry in the cache is the current symbol(s)
fqsns = []
fqsns = set()
for multi_fqsns in list(godw._chart_cache):
fqsns.extend(list(multi_fqsns))
for fqsn in set(multi_fqsns):
fqsns.add(fqsn)
if keep_current_item_selected:
sel = self.view.selectionModel()
cidx = sel.currentIndex()
self.view.set_section_entries(
'cache',
@ -637,7 +657,17 @@ class SearchWidget(QtWidgets.QWidget):
reverse=True,
)
def get_current_item(self) -> Optional[tuple[str, str]]:
if (
keep_current_item_selected
and cidx.isValid()
):
# set current selection back to what it was before filling out
# the view results.
self.view.select_from_idx(cidx)
else:
self.view.select_first()
def get_current_item(self) -> tuple[QModelIndex, str, str] | None:
'''
Return the current completer tree selection as
a tuple ``(parent: str, child: str)`` if valid, else ``None``.
@ -665,7 +695,11 @@ class SearchWidget(QtWidgets.QWidget):
if provider == 'cache':
symbol, _, provider = symbol.rpartition('.')
return provider, symbol
return (
cidx,
provider,
symbol,
)
else:
return None
@ -686,7 +720,7 @@ class SearchWidget(QtWidgets.QWidget):
if value is None:
return None
provider, symbol = value
cidx, provider, symbol = value
godw = self.godwidget
fqsn = f'{symbol}.{provider}'
@ -715,7 +749,9 @@ class SearchWidget(QtWidgets.QWidget):
godw.rt_linked,
)
)
self.show_cache_entries(only=True)
self.show_cache_entries(
only=True,
)
self.bar.focus()
return fqsn
@ -956,11 +992,10 @@ async def handle_keyboard_input(
global _search_active, _search_enabled
# startup
bar = searchbar
search = searchbar.parent()
godwidget = search.godwidget
view = bar.view
view.set_font_size(bar.dpi_font.px_size)
searchw = searchbar.parent()
godwidget = searchw.godwidget
view = searchbar.view
view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
@ -971,13 +1006,13 @@ async def handle_keyboard_input(
n.start_soon(
partial(
fill_results,
search,
searchw,
recv,
)
)
bar.focus()
search.show_cache_entries()
searchbar.focus()
searchw.show_cache_entries()
await trio.sleep(0)
async for kbmsg in recv_chan:
@ -994,16 +1029,24 @@ async def handle_keyboard_input(
Qt.Key_Return
):
_search_enabled = False
await search.chart_current_item(clear_to_cache=True)
search.show_cache_entries(only=True)
await searchw.chart_current_item(clear_to_cache=True)
# XXX: causes hang and segfault..
# searchw.show_cache_entries(
# only=True,
# keep_current_item_selected=True,
# )
view.show_matches()
search.focus()
elif not ctl and not bar.text():
searchw.focus()
elif (
not ctl
and not searchbar.text()
):
# TODO: really should factor this somewhere..bc
# we're doin it in another spot as well..
search.show_cache_entries(only=True)
searchw.show_cache_entries(only=True)
continue
# cancel and close
@ -1012,7 +1055,7 @@ async def handle_keyboard_input(
Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt,
}:
bar.unfocus()
searchbar.unfocus()
# kill the search and focus back on main chart
if godwidget:
@ -1020,41 +1063,54 @@ async def handle_keyboard_input(
continue
if ctl and key in {
Qt.Key_L,
}:
if (
ctl
and key in {Qt.Key_L}
):
# like url (link) highlight in a web browser
bar.focus()
searchbar.focus()
# selection navigation controls
elif ctl and key in {
Qt.Key_D,
}:
elif (
ctl
and key in {Qt.Key_D}
):
view.next_section(direction='down')
_search_enabled = False
elif ctl and key in {
Qt.Key_U,
}:
elif (
ctl
and key in {Qt.Key_U}
):
view.next_section(direction='up')
_search_enabled = False
# selection navigation controls
elif (ctl and key in {
elif (
ctl and (
key in {
Qt.Key_K,
Qt.Key_J,
}
Qt.Key_K,
Qt.Key_J,
}) or key in {
Qt.Key_Up,
Qt.Key_Down,
}:
or key in {
Qt.Key_Up,
Qt.Key_Down,
}
)
):
_search_enabled = False
if key in {Qt.Key_K, Qt.Key_Up}:
if key in {
Qt.Key_K,
Qt.Key_Up
}:
item = view.select_previous()
elif key in {Qt.Key_J, Qt.Key_Down}:
elif key in {
Qt.Key_J,
Qt.Key_Down,
}:
item = view.select_next()
if item:
@ -1063,15 +1119,18 @@ async def handle_keyboard_input(
# if we're in the cache section and thus the next
# selection is a cache item, switch and show it
# immediately since it should be very fast.
if parent_item and parent_item.text() == 'cache':
await search.chart_current_item(clear_to_cache=False)
if (
parent_item
and parent_item.text() == 'cache'
):
await searchw.chart_current_item(clear_to_cache=False)
# ACTUAL SEARCH BLOCK #
# where we fuzzy complete and fill out sections.
elif not ctl:
# relay to completer task
_search_enabled = True
send.send_nowait(search.bar.text())
send.send_nowait(searchw.bar.text())
_search_active.set()