Compare commits

..

No commits in common. "e2e3e30d7a7f50075f1e7c5a994e8245e4c98120" and "61c4147b7384de57f51b64d2eeb6df7451ef3442" have entirely different histories.

10 changed files with 148 additions and 152 deletions

View File

@ -94,6 +94,21 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
symbol: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
instrument = symbol
with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client:
bars = await client.bars(instrument)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,

View File

@ -303,6 +303,24 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
'''
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
@ -401,15 +419,14 @@ async def stream_quotes(
yield
# unsub from all pairs on teardown
if ws.connected():
await ws.send_msg({
'pair': list(ws_pairs.values()),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
await ws.send_msg({
'pair': list(ws_pairs.values()),
'event': 'unsubscribe',
'subscription': ['ohlc', 'spread'],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
# see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds

View File

@ -172,7 +172,6 @@ async def clear_dark_triggers(
# TODO:
# - numba all this!
# - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
async for quotes in quote_stream:
# start = time.time()
for sym, quote in quotes.items():

View File

@ -414,9 +414,7 @@ async def start_backfill(
and starts[next_start_dt] <= 6
):
start_dt = min(starts)
log.warning(
f"{bfqsn}: skipping duplicate frame @ {next_start_dt}"
)
print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}")
starts[start_dt] += 1
continue
@ -618,7 +616,10 @@ async def tsdb_backfill(
# unblock the feed bus management task
# assert len(shms[1].array)
task_status.started()
task_status.started((
shms[60],
shms[1],
))
async def back_load_from_tsdb(
timeframe: int,
@ -867,7 +868,10 @@ async def manage_history(
marketstore.open_storage_client(fqsn)as storage,
):
# TODO: drop returning the output that we pass in?
await bus.nursery.start(
(
hist_shm,
rt_shm,
) = await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,

View File

@ -21,9 +21,7 @@ core task logic for processing chains
from dataclasses import dataclass
from functools import partial
from typing import (
AsyncIterator,
Callable,
Optional,
AsyncIterator, Callable, Optional,
Union,
)
@ -388,7 +386,7 @@ async def cascade(
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.info(f're-syncing fsp {func_name} to source')
log.debug(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
@ -431,7 +429,6 @@ async def cascade(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
@ -447,8 +444,7 @@ async def cascade(
# signal
times = src.array['time']
if len(times) > 1:
last_ts = times[-1]
delay_s = float(last_ts - times[times != last_ts][-1])
delay_s = float(times[-1] - times[times != times[-1]][-1])
else:
# our default "HFT" sample rate.
delay_s = _default_delay_s

View File

@ -127,10 +127,7 @@ class GodWidget(QWidget):
# self.init_strategy_ui()
# self.vbox.addLayout(self.hbox)
self._chart_cache: dict[
str,
tuple[LinkedSplits, LinkedSplits],
] = {}
self._chart_cache: dict[str, LinkedSplits] = {}
self.hist_linked: Optional[LinkedSplits] = None
self.rt_linked: Optional[LinkedSplits] = None
@ -150,6 +147,23 @@ class GodWidget(QWidget):
def linkedsplits(self) -> LinkedSplits:
return self.rt_linked
# def init_timeframes_ui(self):
# self.tf_layout = QHBoxLayout()
# self.tf_layout.setSpacing(0)
# self.tf_layout.setContentsMargins(0, 12, 0, 0)
# time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
# btn_prefix = 'TF'
# for tf in time_frames:
# btn_name = ''.join([btn_prefix, tf])
# btn = QtWidgets.QPushButton(tf)
# # TODO:
# btn.setEnabled(False)
# setattr(self, btn_name, btn)
# self.tf_layout.addWidget(btn)
# self.toolbar_layout.addLayout(self.tf_layout)
# XXX: strat loader/saver that we don't need yet.
# def init_strategy_ui(self):
# self.strategy_box = StrategyBoxWidget(self)

View File

@ -19,10 +19,6 @@ Data vizualization APIs
'''
from __future__ import annotations
from math import (
ceil,
floor,
)
from typing import (
Optional,
Literal,
@ -460,13 +456,13 @@ class Viz(msgspec.Struct): # , frozen=True):
array = self.shm.array
index = array[index_field]
first = floor(index[0])
last = ceil(index[-1])
first = round(index[0])
last = round(index[-1])
# first and last datums in view determined by
# l / r view range.
leftmost = floor(l)
rightmost = ceil(r)
leftmost = round(l)
rightmost = round(r)
# invalid view state
if (

View File

@ -457,7 +457,7 @@ def graphics_update_cycle(
# state-tracking ``chart_maxmin()`` routine from above?
chart = ds.chart
hist_chart = ds.hist_chart
hist_chart = ds.godwidget.hist_linked.chart
flume = ds.flume
sym = flume.symbol

View File

@ -24,6 +24,7 @@ for fast incremental update.
'''
from __future__ import annotations
from typing import (
Optional,
TYPE_CHECKING,
)
@ -60,6 +61,17 @@ class Renderer(msgspec.Struct):
path: QPainterPath | None = None
fast_path: QPainterPath | None = None
# XXX: just ideas..
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
# the ``.draw()`` implementation.
# graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None
# graphics_t_shm: Optional[ShmArray] = None
# path graphics update implementation methods
# prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# downsampling state
_last_uppx: float = 0
_in_ds: bool = False
@ -206,7 +218,7 @@ class Renderer(msgspec.Struct):
):
# print(f"{self.viz.name} -> REDRAWING BRUH")
if new_sample_rate and showing_src_data:
log.info(f'DE-downsampling -> {array_key}')
log.info(f'DEDOWN -> {array_key}')
self._in_ds = False
elif should_ds and uppx > 1:
@ -257,7 +269,10 @@ class Renderer(msgspec.Struct):
append_length > 0
and do_append
):
profiler(f'sliced append path {append_length}')
print(f'{array_key} append len: {append_length}')
# new_x = x_1d[-append_length - 2:] # slice_to_head]
# new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path')
# (
# x_1d,
# y_1d,
@ -285,23 +300,22 @@ class Renderer(msgspec.Struct):
profiler('generated append qpath')
if use_fpath:
# print(f'{self.viz.name}: FAST PATH')
# an attempt at trying to make append-updates faster..
if fast_path is None:
fast_path = append_path
# fast_path.reserve(int(6e3))
else:
# print(
# f'{self.viz.name}: FAST PATH\n'
# f"append_path br: {append_path.boundingRect()}\n"
# f"path size: {size}\n"
# f"append_path len: {append_path.length()}\n"
# f"fast_path len: {fast_path.length()}\n"
# )
fast_path.connectPath(append_path)
size = fast_path.capacity()
profiler(f'connected fast path w size: {size}')
print(
f"append_path br: {append_path.boundingRect()}\n"
f"path size: {size}\n"
f"append_path len: {append_path.length()}\n"
f"fast_path len: {fast_path.length()}\n"
)
# graphics.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path)

View File

@ -144,29 +144,15 @@ class CompleterView(QTreeView):
self._font_size: int = 0 # pixels
self._init: bool = False
async def on_pressed(
self,
idx: QModelIndex,
) -> None:
async def on_pressed(self, idx: QModelIndex) -> None:
'''
Mouse pressed on view handler.
'''
search = self.parent()
await search.chart_current_item(
clear_to_cache=True,
)
# XXX: this causes Qt to hang and segfault..lovely
# self.show_cache_entries(
# only=True,
# keep_current_item_selected=True,
# )
await search.chart_current_item()
search.focus()
def set_font_size(self, size: int = 18):
# print(size)
if size < 0:
@ -302,7 +288,7 @@ class CompleterView(QTreeView):
def select_first(self) -> QStandardItem:
'''
Select the first depth >= 2 entry from the completer tree and
return its item.
return it's item.
'''
# ensure we're **not** selecting the first level parent node and
@ -629,8 +615,6 @@ class SearchWidget(QtWidgets.QWidget):
def show_cache_entries(
self,
only: bool = False,
keep_current_item_selected: bool = False,
) -> None:
'''
Clear the search results view and show only cached (aka recently
@ -640,14 +624,10 @@ class SearchWidget(QtWidgets.QWidget):
godw = self.godwidget
# first entry in the cache is the current symbol(s)
fqsns = set()
for multi_fqsns in list(godw._chart_cache):
for fqsn in set(multi_fqsns):
fqsns.add(fqsn)
fqsns = []
if keep_current_item_selected:
sel = self.view.selectionModel()
cidx = sel.currentIndex()
for multi_fqsns in list(godw._chart_cache):
fqsns.extend(list(multi_fqsns))
self.view.set_section_entries(
'cache',
@ -657,17 +637,7 @@ class SearchWidget(QtWidgets.QWidget):
reverse=True,
)
if (
keep_current_item_selected
and cidx.isValid()
):
# set current selection back to what it was before filling out
# the view results.
self.view.select_from_idx(cidx)
else:
self.view.select_first()
def get_current_item(self) -> tuple[QModelIndex, str, str] | None:
def get_current_item(self) -> Optional[tuple[str, str]]:
'''
Return the current completer tree selection as
a tuple ``(parent: str, child: str)`` if valid, else ``None``.
@ -695,11 +665,7 @@ class SearchWidget(QtWidgets.QWidget):
if provider == 'cache':
symbol, _, provider = symbol.rpartition('.')
return (
cidx,
provider,
symbol,
)
return provider, symbol
else:
return None
@ -720,7 +686,7 @@ class SearchWidget(QtWidgets.QWidget):
if value is None:
return None
cidx, provider, symbol = value
provider, symbol = value
godw = self.godwidget
fqsn = f'{symbol}.{provider}'
@ -749,9 +715,7 @@ class SearchWidget(QtWidgets.QWidget):
godw.rt_linked,
)
)
self.show_cache_entries(
only=True,
)
self.show_cache_entries(only=True)
self.bar.focus()
return fqsn
@ -992,10 +956,11 @@ async def handle_keyboard_input(
global _search_active, _search_enabled
# startup
searchw = searchbar.parent()
godwidget = searchw.godwidget
view = searchbar.view
view.set_font_size(searchbar.dpi_font.px_size)
bar = searchbar
search = searchbar.parent()
godwidget = search.godwidget
view = bar.view
view.set_font_size(bar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
@ -1006,13 +971,13 @@ async def handle_keyboard_input(
n.start_soon(
partial(
fill_results,
searchw,
search,
recv,
)
)
searchbar.focus()
searchw.show_cache_entries()
bar.focus()
search.show_cache_entries()
await trio.sleep(0)
async for kbmsg in recv_chan:
@ -1029,24 +994,16 @@ async def handle_keyboard_input(
Qt.Key_Return
):
_search_enabled = False
await searchw.chart_current_item(clear_to_cache=True)
# XXX: causes hang and segfault..
# searchw.show_cache_entries(
# only=True,
# keep_current_item_selected=True,
# )
await search.chart_current_item(clear_to_cache=True)
search.show_cache_entries(only=True)
view.show_matches()
searchw.focus()
search.focus()
elif not ctl and not bar.text():
elif (
not ctl
and not searchbar.text()
):
# TODO: really should factor this somewhere..bc
# we're doin it in another spot as well..
searchw.show_cache_entries(only=True)
search.show_cache_entries(only=True)
continue
# cancel and close
@ -1055,7 +1012,7 @@ async def handle_keyboard_input(
Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt,
}:
searchbar.unfocus()
bar.unfocus()
# kill the search and focus back on main chart
if godwidget:
@ -1063,54 +1020,41 @@ async def handle_keyboard_input(
continue
if (
ctl
and key in {Qt.Key_L}
):
if ctl and key in {
Qt.Key_L,
}:
# like url (link) highlight in a web browser
searchbar.focus()
bar.focus()
# selection navigation controls
elif (
ctl
and key in {Qt.Key_D}
):
elif ctl and key in {
Qt.Key_D,
}:
view.next_section(direction='down')
_search_enabled = False
elif (
ctl
and key in {Qt.Key_U}
):
elif ctl and key in {
Qt.Key_U,
}:
view.next_section(direction='up')
_search_enabled = False
# selection navigation controls
elif (
ctl and (
key in {
Qt.Key_K,
Qt.Key_J,
}
elif (ctl and key in {
or key in {
Qt.Key_Up,
Qt.Key_Down,
}
)
):
Qt.Key_K,
Qt.Key_J,
}) or key in {
Qt.Key_Up,
Qt.Key_Down,
}:
_search_enabled = False
if key in {
Qt.Key_K,
Qt.Key_Up
}:
if key in {Qt.Key_K, Qt.Key_Up}:
item = view.select_previous()
elif key in {
Qt.Key_J,
Qt.Key_Down,
}:
elif key in {Qt.Key_J, Qt.Key_Down}:
item = view.select_next()
if item:
@ -1119,18 +1063,15 @@ async def handle_keyboard_input(
# if we're in the cache section and thus the next
# selection is a cache item, switch and show it
# immediately since it should be very fast.
if (
parent_item
and parent_item.text() == 'cache'
):
await searchw.chart_current_item(clear_to_cache=False)
if parent_item and parent_item.text() == 'cache':
await search.chart_current_item(clear_to_cache=False)
# ACTUAL SEARCH BLOCK #
# where we fuzzy complete and fill out sections.
elif not ctl:
# relay to completer task
_search_enabled = True
send.send_nowait(searchw.bar.text())
send.send_nowait(search.bar.text())
_search_active.set()