Compare commits

..

No commits in common. "e2e3e30d7a7f50075f1e7c5a994e8245e4c98120" and "61c4147b7384de57f51b64d2eeb6df7451ef3442" have entirely different histories.

10 changed files with 148 additions and 152 deletions

View File

@ -94,6 +94,21 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
symbol: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
instrument = symbol
with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client:
bars = await client.bars(instrument)
shm.push(bars)
task_status.started(cs)
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,

View File

@ -303,6 +303,24 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 1, 'rate': 1} yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
'''
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
@ -401,15 +419,14 @@ async def stream_quotes(
yield yield
# unsub from all pairs on teardown # unsub from all pairs on teardown
if ws.connected(): await ws.send_msg({
await ws.send_msg({ 'pair': list(ws_pairs.values()),
'pair': list(ws_pairs.values()), 'event': 'unsubscribe',
'event': 'unsubscribe', 'subscription': ['ohlc', 'spread'],
'subscription': ['ohlc', 'spread'], })
})
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
# see the tips on reconnection logic: # see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds

View File

@ -172,7 +172,6 @@ async def clear_dark_triggers(
# TODO: # TODO:
# - numba all this! # - numba all this!
# - this stream may eventually contain multiple symbols # - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
async for quotes in quote_stream: async for quotes in quote_stream:
# start = time.time() # start = time.time()
for sym, quote in quotes.items(): for sym, quote in quotes.items():

View File

@ -414,9 +414,7 @@ async def start_backfill(
and starts[next_start_dt] <= 6 and starts[next_start_dt] <= 6
): ):
start_dt = min(starts) start_dt = min(starts)
log.warning( print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}")
f"{bfqsn}: skipping duplicate frame @ {next_start_dt}"
)
starts[start_dt] += 1 starts[start_dt] += 1
continue continue
@ -618,7 +616,10 @@ async def tsdb_backfill(
# unblock the feed bus management task # unblock the feed bus management task
# assert len(shms[1].array) # assert len(shms[1].array)
task_status.started() task_status.started((
shms[60],
shms[1],
))
async def back_load_from_tsdb( async def back_load_from_tsdb(
timeframe: int, timeframe: int,
@ -867,7 +868,10 @@ async def manage_history(
marketstore.open_storage_client(fqsn)as storage, marketstore.open_storage_client(fqsn)as storage,
): ):
# TODO: drop returning the output that we pass in? # TODO: drop returning the output that we pass in?
await bus.nursery.start( (
hist_shm,
rt_shm,
) = await bus.nursery.start(
tsdb_backfill, tsdb_backfill,
mod, mod,
marketstore, marketstore,

View File

@ -21,9 +21,7 @@ core task logic for processing chains
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial from functools import partial
from typing import ( from typing import (
AsyncIterator, AsyncIterator, Callable, Optional,
Callable,
Optional,
Union, Union,
) )
@ -388,7 +386,7 @@ async def cascade(
) -> tuple[TaskTracker, int]: ) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach # TODO: adopt an incremental update engine/approach
# where possible here eventually! # where possible here eventually!
log.info(f're-syncing fsp {func_name} to source') log.debug(f're-syncing fsp {func_name} to source')
tracker.cs.cancel() tracker.cs.cancel()
await tracker.complete.wait() await tracker.complete.wait()
tracker, index = await n.start(fsp_target) tracker, index = await n.start(fsp_target)
@ -431,7 +429,6 @@ async def cascade(
tracker: TaskTracker, tracker: TaskTracker,
src: ShmArray, src: ShmArray,
dst: ShmArray, dst: ShmArray,
) -> tuple[TaskTracker, int]: ) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
@ -447,8 +444,7 @@ async def cascade(
# signal # signal
times = src.array['time'] times = src.array['time']
if len(times) > 1: if len(times) > 1:
last_ts = times[-1] delay_s = float(times[-1] - times[times != times[-1]][-1])
delay_s = float(last_ts - times[times != last_ts][-1])
else: else:
# our default "HFT" sample rate. # our default "HFT" sample rate.
delay_s = _default_delay_s delay_s = _default_delay_s

View File

@ -127,10 +127,7 @@ class GodWidget(QWidget):
# self.init_strategy_ui() # self.init_strategy_ui()
# self.vbox.addLayout(self.hbox) # self.vbox.addLayout(self.hbox)
self._chart_cache: dict[ self._chart_cache: dict[str, LinkedSplits] = {}
str,
tuple[LinkedSplits, LinkedSplits],
] = {}
self.hist_linked: Optional[LinkedSplits] = None self.hist_linked: Optional[LinkedSplits] = None
self.rt_linked: Optional[LinkedSplits] = None self.rt_linked: Optional[LinkedSplits] = None
@ -150,6 +147,23 @@ class GodWidget(QWidget):
def linkedsplits(self) -> LinkedSplits: def linkedsplits(self) -> LinkedSplits:
return self.rt_linked return self.rt_linked
# def init_timeframes_ui(self):
# self.tf_layout = QHBoxLayout()
# self.tf_layout.setSpacing(0)
# self.tf_layout.setContentsMargins(0, 12, 0, 0)
# time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
# btn_prefix = 'TF'
# for tf in time_frames:
# btn_name = ''.join([btn_prefix, tf])
# btn = QtWidgets.QPushButton(tf)
# # TODO:
# btn.setEnabled(False)
# setattr(self, btn_name, btn)
# self.tf_layout.addWidget(btn)
# self.toolbar_layout.addLayout(self.tf_layout)
# XXX: strat loader/saver that we don't need yet. # XXX: strat loader/saver that we don't need yet.
# def init_strategy_ui(self): # def init_strategy_ui(self):
# self.strategy_box = StrategyBoxWidget(self) # self.strategy_box = StrategyBoxWidget(self)

View File

@ -19,10 +19,6 @@ Data vizualization APIs
''' '''
from __future__ import annotations from __future__ import annotations
from math import (
ceil,
floor,
)
from typing import ( from typing import (
Optional, Optional,
Literal, Literal,
@ -460,13 +456,13 @@ class Viz(msgspec.Struct): # , frozen=True):
array = self.shm.array array = self.shm.array
index = array[index_field] index = array[index_field]
first = floor(index[0]) first = round(index[0])
last = ceil(index[-1]) last = round(index[-1])
# first and last datums in view determined by # first and last datums in view determined by
# l / r view range. # l / r view range.
leftmost = floor(l) leftmost = round(l)
rightmost = ceil(r) rightmost = round(r)
# invalid view state # invalid view state
if ( if (

View File

@ -457,7 +457,7 @@ def graphics_update_cycle(
# state-tracking ``chart_maxmin()`` routine from above? # state-tracking ``chart_maxmin()`` routine from above?
chart = ds.chart chart = ds.chart
hist_chart = ds.hist_chart hist_chart = ds.godwidget.hist_linked.chart
flume = ds.flume flume = ds.flume
sym = flume.symbol sym = flume.symbol

View File

@ -24,6 +24,7 @@ for fast incremental update.
''' '''
from __future__ import annotations from __future__ import annotations
from typing import ( from typing import (
Optional,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -60,6 +61,17 @@ class Renderer(msgspec.Struct):
path: QPainterPath | None = None path: QPainterPath | None = None
fast_path: QPainterPath | None = None fast_path: QPainterPath | None = None
# XXX: just ideas..
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
# the ``.draw()`` implementation.
# graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None
# graphics_t_shm: Optional[ShmArray] = None
# path graphics update implementation methods
# prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# downsampling state # downsampling state
_last_uppx: float = 0 _last_uppx: float = 0
_in_ds: bool = False _in_ds: bool = False
@ -206,7 +218,7 @@ class Renderer(msgspec.Struct):
): ):
# print(f"{self.viz.name} -> REDRAWING BRUH") # print(f"{self.viz.name} -> REDRAWING BRUH")
if new_sample_rate and showing_src_data: if new_sample_rate and showing_src_data:
log.info(f'DE-downsampling -> {array_key}') log.info(f'DEDOWN -> {array_key}')
self._in_ds = False self._in_ds = False
elif should_ds and uppx > 1: elif should_ds and uppx > 1:
@ -257,7 +269,10 @@ class Renderer(msgspec.Struct):
append_length > 0 append_length > 0
and do_append and do_append
): ):
profiler(f'sliced append path {append_length}') print(f'{array_key} append len: {append_length}')
# new_x = x_1d[-append_length - 2:] # slice_to_head]
# new_y = y_1d[-append_length - 2:] # slice_to_head]
profiler('sliced append path')
# ( # (
# x_1d, # x_1d,
# y_1d, # y_1d,
@ -285,23 +300,22 @@ class Renderer(msgspec.Struct):
profiler('generated append qpath') profiler('generated append qpath')
if use_fpath: if use_fpath:
# print(f'{self.viz.name}: FAST PATH')
# an attempt at trying to make append-updates faster.. # an attempt at trying to make append-updates faster..
if fast_path is None: if fast_path is None:
fast_path = append_path fast_path = append_path
# fast_path.reserve(int(6e3)) # fast_path.reserve(int(6e3))
else: else:
# print(
# f'{self.viz.name}: FAST PATH\n'
# f"append_path br: {append_path.boundingRect()}\n"
# f"path size: {size}\n"
# f"append_path len: {append_path.length()}\n"
# f"fast_path len: {fast_path.length()}\n"
# )
fast_path.connectPath(append_path) fast_path.connectPath(append_path)
size = fast_path.capacity() size = fast_path.capacity()
profiler(f'connected fast path w size: {size}') profiler(f'connected fast path w size: {size}')
print(
f"append_path br: {append_path.boundingRect()}\n"
f"path size: {size}\n"
f"append_path len: {append_path.length()}\n"
f"fast_path len: {fast_path.length()}\n"
)
# graphics.path.moveTo(new_x[0], new_y[0]) # graphics.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path) # path.connectPath(append_path)

View File

@ -144,29 +144,15 @@ class CompleterView(QTreeView):
self._font_size: int = 0 # pixels self._font_size: int = 0 # pixels
self._init: bool = False self._init: bool = False
async def on_pressed( async def on_pressed(self, idx: QModelIndex) -> None:
self,
idx: QModelIndex,
) -> None:
''' '''
Mouse pressed on view handler. Mouse pressed on view handler.
''' '''
search = self.parent() search = self.parent()
await search.chart_current_item()
await search.chart_current_item(
clear_to_cache=True,
)
# XXX: this causes Qt to hang and segfault..lovely
# self.show_cache_entries(
# only=True,
# keep_current_item_selected=True,
# )
search.focus() search.focus()
def set_font_size(self, size: int = 18): def set_font_size(self, size: int = 18):
# print(size) # print(size)
if size < 0: if size < 0:
@ -302,7 +288,7 @@ class CompleterView(QTreeView):
def select_first(self) -> QStandardItem: def select_first(self) -> QStandardItem:
''' '''
Select the first depth >= 2 entry from the completer tree and Select the first depth >= 2 entry from the completer tree and
return its item. return it's item.
''' '''
# ensure we're **not** selecting the first level parent node and # ensure we're **not** selecting the first level parent node and
@ -629,8 +615,6 @@ class SearchWidget(QtWidgets.QWidget):
def show_cache_entries( def show_cache_entries(
self, self,
only: bool = False, only: bool = False,
keep_current_item_selected: bool = False,
) -> None: ) -> None:
''' '''
Clear the search results view and show only cached (aka recently Clear the search results view and show only cached (aka recently
@ -640,14 +624,10 @@ class SearchWidget(QtWidgets.QWidget):
godw = self.godwidget godw = self.godwidget
# first entry in the cache is the current symbol(s) # first entry in the cache is the current symbol(s)
fqsns = set() fqsns = []
for multi_fqsns in list(godw._chart_cache):
for fqsn in set(multi_fqsns):
fqsns.add(fqsn)
if keep_current_item_selected: for multi_fqsns in list(godw._chart_cache):
sel = self.view.selectionModel() fqsns.extend(list(multi_fqsns))
cidx = sel.currentIndex()
self.view.set_section_entries( self.view.set_section_entries(
'cache', 'cache',
@ -657,17 +637,7 @@ class SearchWidget(QtWidgets.QWidget):
reverse=True, reverse=True,
) )
if ( def get_current_item(self) -> Optional[tuple[str, str]]:
keep_current_item_selected
and cidx.isValid()
):
# set current selection back to what it was before filling out
# the view results.
self.view.select_from_idx(cidx)
else:
self.view.select_first()
def get_current_item(self) -> tuple[QModelIndex, str, str] | None:
''' '''
Return the current completer tree selection as Return the current completer tree selection as
a tuple ``(parent: str, child: str)`` if valid, else ``None``. a tuple ``(parent: str, child: str)`` if valid, else ``None``.
@ -695,11 +665,7 @@ class SearchWidget(QtWidgets.QWidget):
if provider == 'cache': if provider == 'cache':
symbol, _, provider = symbol.rpartition('.') symbol, _, provider = symbol.rpartition('.')
return ( return provider, symbol
cidx,
provider,
symbol,
)
else: else:
return None return None
@ -720,7 +686,7 @@ class SearchWidget(QtWidgets.QWidget):
if value is None: if value is None:
return None return None
cidx, provider, symbol = value provider, symbol = value
godw = self.godwidget godw = self.godwidget
fqsn = f'{symbol}.{provider}' fqsn = f'{symbol}.{provider}'
@ -749,9 +715,7 @@ class SearchWidget(QtWidgets.QWidget):
godw.rt_linked, godw.rt_linked,
) )
) )
self.show_cache_entries( self.show_cache_entries(only=True)
only=True,
)
self.bar.focus() self.bar.focus()
return fqsn return fqsn
@ -992,10 +956,11 @@ async def handle_keyboard_input(
global _search_active, _search_enabled global _search_active, _search_enabled
# startup # startup
searchw = searchbar.parent() bar = searchbar
godwidget = searchw.godwidget search = searchbar.parent()
view = searchbar.view godwidget = search.godwidget
view.set_font_size(searchbar.dpi_font.px_size) view = bar.view
view.set_font_size(bar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616) send, recv = trio.open_memory_channel(616)
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
@ -1006,13 +971,13 @@ async def handle_keyboard_input(
n.start_soon( n.start_soon(
partial( partial(
fill_results, fill_results,
searchw, search,
recv, recv,
) )
) )
searchbar.focus() bar.focus()
searchw.show_cache_entries() search.show_cache_entries()
await trio.sleep(0) await trio.sleep(0)
async for kbmsg in recv_chan: async for kbmsg in recv_chan:
@ -1029,24 +994,16 @@ async def handle_keyboard_input(
Qt.Key_Return Qt.Key_Return
): ):
_search_enabled = False _search_enabled = False
await searchw.chart_current_item(clear_to_cache=True) await search.chart_current_item(clear_to_cache=True)
search.show_cache_entries(only=True)
# XXX: causes hang and segfault..
# searchw.show_cache_entries(
# only=True,
# keep_current_item_selected=True,
# )
view.show_matches() view.show_matches()
searchw.focus() search.focus()
elif not ctl and not bar.text():
elif (
not ctl
and not searchbar.text()
):
# TODO: really should factor this somewhere..bc # TODO: really should factor this somewhere..bc
# we're doin it in another spot as well.. # we're doin it in another spot as well..
searchw.show_cache_entries(only=True) search.show_cache_entries(only=True)
continue continue
# cancel and close # cancel and close
@ -1055,7 +1012,7 @@ async def handle_keyboard_input(
Qt.Key_Space, # i feel like this is the "native" one Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt, Qt.Key_Alt,
}: }:
searchbar.unfocus() bar.unfocus()
# kill the search and focus back on main chart # kill the search and focus back on main chart
if godwidget: if godwidget:
@ -1063,54 +1020,41 @@ async def handle_keyboard_input(
continue continue
if ( if ctl and key in {
ctl Qt.Key_L,
and key in {Qt.Key_L} }:
):
# like url (link) highlight in a web browser # like url (link) highlight in a web browser
searchbar.focus() bar.focus()
# selection navigation controls # selection navigation controls
elif ( elif ctl and key in {
ctl Qt.Key_D,
and key in {Qt.Key_D} }:
):
view.next_section(direction='down') view.next_section(direction='down')
_search_enabled = False _search_enabled = False
elif ( elif ctl and key in {
ctl Qt.Key_U,
and key in {Qt.Key_U} }:
):
view.next_section(direction='up') view.next_section(direction='up')
_search_enabled = False _search_enabled = False
# selection navigation controls # selection navigation controls
elif ( elif (ctl and key in {
ctl and (
key in {
Qt.Key_K,
Qt.Key_J,
}
or key in { Qt.Key_K,
Qt.Key_Up, Qt.Key_J,
Qt.Key_Down,
} }) or key in {
)
): Qt.Key_Up,
Qt.Key_Down,
}:
_search_enabled = False _search_enabled = False
if key in {Qt.Key_K, Qt.Key_Up}:
if key in {
Qt.Key_K,
Qt.Key_Up
}:
item = view.select_previous() item = view.select_previous()
elif key in { elif key in {Qt.Key_J, Qt.Key_Down}:
Qt.Key_J,
Qt.Key_Down,
}:
item = view.select_next() item = view.select_next()
if item: if item:
@ -1119,18 +1063,15 @@ async def handle_keyboard_input(
# if we're in the cache section and thus the next # if we're in the cache section and thus the next
# selection is a cache item, switch and show it # selection is a cache item, switch and show it
# immediately since it should be very fast. # immediately since it should be very fast.
if ( if parent_item and parent_item.text() == 'cache':
parent_item await search.chart_current_item(clear_to_cache=False)
and parent_item.text() == 'cache'
):
await searchw.chart_current_item(clear_to_cache=False)
# ACTUAL SEARCH BLOCK # # ACTUAL SEARCH BLOCK #
# where we fuzzy complete and fill out sections. # where we fuzzy complete and fill out sections.
elif not ctl: elif not ctl:
# relay to completer task # relay to completer task
_search_enabled = True _search_enabled = True
send.send_nowait(searchw.bar.text()) send.send_nowait(search.bar.text())
_search_active.set() _search_active.set()