# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ qompleterz: embeddable search and complete using trio, Qt and fuzzywuzzy. """ # link set for hackzin on this stuff: # https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections # https://doc.qt.io/qt-5/model-view-programming.html # https://doc.qt.io/qt-5/modelview.html # https://doc.qt.io/qt-5/qtreeview.html#selectedIndexes # https://doc.qt.io/qt-5/qmodelindex.html#siblingAtColumn # https://doc.qt.io/qt-5/qitemselectionmodel.html#currentIndex # https://www.programcreek.com/python/example/108109/PyQt5.QtWidgets.QTreeView # https://doc.qt.io/qt-5/qsyntaxhighlighter.html # https://github.com/qutebrowser/qutebrowser/blob/master/qutebrowser/completion/completiondelegate.py#L243 # https://forum.qt.io/topic/61343/highlight-matched-substrings-in-qstyleditemdelegate from collections import defaultdict from contextlib import asynccontextmanager from functools import partial from typing import ( Optional, Callable, Awaitable, Sequence, Any, AsyncIterator, Iterator, ) import time # from pprint import pformat from fuzzywuzzy import process as fuzzy import trio from trio_typing import TaskStatus from PyQt5 import QtCore from PyQt5 import QtWidgets from PyQt5.QtCore import ( Qt, QModelIndex, QItemSelectionModel, ) from PyQt5.QtGui import ( # QLayout, QStandardItem, QStandardItemModel, ) from PyQt5.QtWidgets import ( QWidget, QTreeView, # QListWidgetItem, # QAbstractScrollArea, # QStyledItemDelegate, ) from ..log import get_logger from ._style import ( _font, hcolor, ) from ._forms import Edit, FontScaledDelegate log = get_logger(__name__) class CompleterView(QTreeView): mode_name: str = 'search-nav' # XXX: relevant docs links: # - simple widget version of this: # https://doc.qt.io/qt-5/qtreewidget.html#details # - MVC high level instructional: # https://doc.qt.io/qt-5/model-view-programming.html # - MV tut: # https://doc.qt.io/qt-5/modelview.html # - custome header view (for doing stuff like we have in kivy?): # https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections # TODO: selection model stuff for eventual aggregate feeds # charting and mgmt; # https://doc.qt.io/qt-5/qabstractitemview.html#setSelectionModel # https://doc.qt.io/qt-5/qitemselectionmodel.html # https://doc.qt.io/qt-5/modelview.html#3-2-working-with-selections # https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-of-items # TODO: mouse extended handling: # https://doc.qt.io/qt-5/qabstractitemview.html#entered def __init__( self, parent=None, labels: list[str] = [], ) -> None: super().__init__(parent) model = QStandardItemModel(self) self.labels = labels # a std "tabular" config self.setItemDelegate(FontScaledDelegate(self)) self.setModel(model) self.setAlternatingRowColors(True) # TODO: size this based on DPI font self.setIndentation(_font.px_size) self.setUniformRowHeights(True) # self.setColumnWidth(0, 3) # self.setVerticalBarPolicy(Qt.ScrollBarAlwaysOff) # self.setSizeAdjustPolicy(QAbstractScrollArea.AdjustIgnored) # ux settings self.setSizePolicy( QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding, ) self.setItemsExpandable(True) self.setExpandsOnDoubleClick(False) self.setAnimated(False) self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) # column headers model.setHorizontalHeaderLabels(labels) self._font_size: int = 0 # pixels self._init: bool = False async def on_pressed( self, idx: QModelIndex, ) -> None: ''' Mouse pressed on view handler. ''' search = self.parent() await search.chart_current_item( clear_to_cache=True, ) # XXX: this causes Qt to hang and segfault..lovely # self.show_cache_entries( # only=True, # keep_current_item_selected=True, # ) search.focus() def set_font_size(self, size: int = 18): # print(size) if size < 0: size = 16 self._font_size = size self.setStyleSheet(f"font: {size}px") def resize_to_results( self, w: Optional[float] = 0, h: Optional[float] = None, ) -> None: model = self.model() cols = model.columnCount() cidx = self.selectionModel().currentIndex() rows = model.rowCount() self.expandAll() # compute the approx height in pixels needed to include # all result rows in view. row_h = rows_h = self.rowHeight(cidx) * (rows + 1) for idx, item in self.iter_df_rows(): row_h = self.rowHeight(idx) rows_h += row_h # print(f'row_h: {row_h}\nrows_h: {rows_h}') # TODO: could we just break early here on detection # of ``rows_h >= h``? col_w_tot = 0 for i in range(cols): # only slap in a rows's height's worth # of padding once at startup.. no idea if ( not self._init and row_h ): col_w_tot = row_h self._init = True self.resizeColumnToContents(i) col_w_tot += self.columnWidth(i) # NOTE: if the heigh `h` set here is **too large** then the # resize event will perpetually trigger as the window causes # some kind of recompute of callbacks.. so we have to ensure # it's limited. if h: h: int = round(h) abs_mx = round(0.91 * h) self.setMaximumHeight(abs_mx) if rows_h <= abs_mx: # self.setMinimumHeight(rows_h) self.setMinimumHeight(rows_h) # self.setFixedHeight(rows_h) else: self.setMinimumHeight(abs_mx) # dyncamically size to width of longest result seen curr_w = self.width() if curr_w < col_w_tot: self.setMinimumWidth(col_w_tot) self.update() def is_selecting_d1(self) -> bool: cidx = self.selectionModel().currentIndex() return cidx.parent() == QModelIndex() def previous_index(self) -> QModelIndex: cidx = self.selectionModel().currentIndex() one_above = self.indexAbove(cidx) if one_above.parent() == QModelIndex(): # if the next node up's parent is the root we don't want to select # the next node up since it's a top level node and we only # select entries depth >= 2. # see if one more up is not the root and we can select it. two_above = self.indexAbove(one_above) if two_above != QModelIndex(): return two_above else: return cidx return one_above # just next up def next_index(self) -> QModelIndex: cidx = self.selectionModel().currentIndex() one_below = self.indexBelow(cidx) if one_below.parent() == QModelIndex(): # if the next node up's parent is the root we don't want to select # the next node up since it's a top level node and we only # select entries depth >= 2. # see if one more up is not the root and we can select it. two_below = self.indexBelow(one_below) if two_below != QModelIndex(): return two_below else: return cidx return one_below # just next down def select_from_idx( self, idx: QModelIndex, ) -> QStandardItem: ''' Select and return the item at index ``idx``. ''' sel = self.selectionModel() model = self.model() sel.setCurrentIndex( idx, QItemSelectionModel.ClearAndSelect | QItemSelectionModel.Rows ) return model.itemFromIndex(idx) def select_first(self) -> QStandardItem: ''' Select the first depth >= 2 entry from the completer tree and return its item. ''' # ensure we're **not** selecting the first level parent node and # instead its child. model = self.model() for idx, item in self.iter_d1(): if model.rowCount(idx) == 0: continue else: return self.select_from_idx(self.indexBelow(idx)) def select_next(self) -> QStandardItem: idx = self.next_index() assert idx.isValid() return self.select_from_idx(idx) def select_previous(self) -> QStandardItem: idx = self.previous_index() assert idx.isValid() return self.select_from_idx(idx) def next_section(self, direction: str = 'down') -> QModelIndex: cidx = start_idx = self.selectionModel().currentIndex() # step up levels to depth == 1 while cidx.parent() != QModelIndex(): cidx = cidx.parent() # move to next section in `direction` op = {'up': -1, 'down': +1}[direction] next_row = cidx.row() + op nidx = self.model().index(next_row, cidx.column(), QModelIndex()) # do nothing, if there is no valid "next" section if not nidx.isValid(): return self.select_from_idx(start_idx) # go to next selectable child item self.select_from_idx(nidx) return self.select_next() def iter_d1( self, ) -> tuple[QModelIndex, QStandardItem]: model = self.model() isections = model.rowCount() # much thanks to following code to figure out breadth-first # traversing from the root node: # https://stackoverflow.com/a/33126689 for i in range(isections): idx = model.index(i, 0, QModelIndex()) item = model.itemFromIndex(idx) yield idx, item def iter_df_rows( self, iparent: QModelIndex = QModelIndex(), ) -> Iterator[tuple[QModelIndex, QStandardItem]]: model = self.model() isections = model.rowCount(iparent) for i in range(isections): idx = model.index(i, 0, iparent) item = model.itemFromIndex(idx) yield idx, item if model.hasChildren(idx): # recursively yield child items depth-first yield from self.iter_df_rows(idx) def find_section( self, section: str, ) -> Optional[QModelIndex]: ''' Find the *first* depth = 1 section matching ``section`` in the tree and return its index. ''' for idx, item in self.iter_d1(): if item.text() == section: return idx else: # caller must expect his return None def clear_section( self, section: str, status_field: str = None, ) -> None: ''' Clear all result-rows from under the depth = 1 section. ''' idx = self.find_section(section) model = self.model() if idx is not None: if model.hasChildren(idx): rows = model.rowCount(idx) # print(f'removing {rows} from {section}') assert model.removeRows(0, rows, parent=idx) # remove section as well ? # model.removeRow(i, QModelIndex()) if status_field is not None: model.setItem(idx.row(), 1, QStandardItem(status_field)) else: model.setItem(idx.row(), 1, QStandardItem()) return idx else: return None def set_section_entries( self, section: str, values: Sequence[str], clear_all: bool = False, reverse: bool = False, ) -> None: ''' Set result-rows for depth = 1 tree section ``section``. ''' if ( values and not isinstance(values[0], str) ): flattened: list[str] = [] for val in values: flattened.extend(val) values = flattened if reverse: values = reversed(values) model = self.model() if clear_all: # XXX: rewrite the model from scratch if caller requests it model.clear() model.setHorizontalHeaderLabels(self.labels) section_idx = self.clear_section(section) # if we can't find a section start adding to the root if section_idx is None: root = model.invisibleRootItem() section_item = QStandardItem(section) blank = QStandardItem('') root.appendRow([section_item, blank]) else: section_item = model.itemFromIndex(section_idx) # values just needs to be sequence-like for i, s in enumerate(values): ix = QStandardItem(str(i)) item = QStandardItem(s) # Add the item to the model section_item.appendRow([ix, item]) self.expandAll() # TODO: figure out if we can avoid this line in a better way # such that "re-selection" doesn't happen tree-wise for each new # sub-search: # https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-in-item-views # XXX: THE BELOW LINE MUST BE CALLED. # this stuff is super finicky and if not done right will cause # Qt crashes out our buttz. it's required in order to get the # view to show right after typing input. self.select_first() # TODO: the way we might be able to do this more sanely is, # 1. for the currently selected item, when start rewriting # a section figure out the row, column, parent "abstract" # position in the tree view and store it # 2. take that position and re-apply the selection to the new # model/tree by looking up the new "equivalent element" and # selecting self.show_matches() def show_matches( self, wh: Optional[tuple[float, float]] = None, ) -> None: if wh: self.resize_to_results(*wh) else: # case where it's just an update from results and *NOT* # a resize of some higher level parent-container widget. search = self.parent() w, h = search.space_dims() self.resize_to_results(w=w, h=h) self.show() class SearchBar(Edit): mode_name: str = 'search' def __init__( self, parent: QWidget, godwidget: QWidget, view: Optional[CompleterView] = None, **kwargs, ) -> None: self.godwidget = godwidget super().__init__(parent, **kwargs) self.view: CompleterView = view def unfocus(self) -> None: self.parent().hide() self.clearFocus() def hide(self) -> None: if self.view: self.view.hide() super().hide() class SearchWidget(QtWidgets.QWidget): ''' Composed widget of ``SearchBar`` + ``CompleterView``. Includes helper methods for item management in the sub-widgets. ''' mode_name: str = 'search' def __init__( self, godwidget: 'GodWidget', # type: ignore # noqa columns: list[str] = ['src', 'symbol'], parent=None, ) -> None: super().__init__(parent) # size it as we specify self.setSizePolicy( QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed, ) self.godwidget = godwidget godwidget.reg_for_resize(self) self.vbox = QtWidgets.QVBoxLayout(self) self.vbox.setContentsMargins(0, 4, 4, 0) self.vbox.setSpacing(4) # split layout for the (label:| search bar entry) self.bar_hbox = QtWidgets.QHBoxLayout() self.bar_hbox.setContentsMargins(0, 0, 0, 0) self.bar_hbox.setSpacing(4) # add label to left of search bar self.label = label = QtWidgets.QLabel(parent=self) label.setStyleSheet( f"""QLabel {{ color : {hcolor('default_lightest')}; font-size : {_font.px_size - 2}px; }} """ ) label.setTextFormat(3) # markdown label.setFont(_font.font) label.setMargin(4) label.setText("search:") label.show() label.setAlignment( QtCore.Qt.AlignVCenter | QtCore.Qt.AlignLeft ) self.bar_hbox.addWidget(label) self.view = CompleterView( parent=self, labels=columns, ) self.bar = SearchBar( parent=self, view=self.view, godwidget=godwidget, ) self.bar_hbox.addWidget(self.bar) self.vbox.addLayout(self.bar_hbox) self.vbox.setAlignment(self.bar, Qt.AlignTop | Qt.AlignRight) self.vbox.addWidget(self.bar.view) self.vbox.setAlignment(self.view, Qt.AlignTop | Qt.AlignLeft) def focus(self) -> None: self.show() self.bar.focus() def show_cache_entries( self, only: bool = False, keep_current_item_selected: bool = False, ) -> None: ''' Clear the search results view and show only cached (aka recently loaded with active data) feeds in the results section. ''' godw = self.godwidget # first entry in the cache is the current symbol(s) fqsns = set() for multi_fqsns in list(godw._chart_cache): for fqsn in set(multi_fqsns): fqsns.add(fqsn) if keep_current_item_selected: sel = self.view.selectionModel() cidx = sel.currentIndex() self.view.set_section_entries( 'cache', list(fqsns), # remove all other completion results except for cache clear_all=only, reverse=True, ) if ( keep_current_item_selected and cidx.isValid() ): # set current selection back to what it was before filling out # the view results. self.view.select_from_idx(cidx) else: self.view.select_first() def get_current_item(self) -> tuple[QModelIndex, str, str] | None: ''' Return the current completer tree selection as a tuple ``(parent: str, child: str)`` if valid, else ``None``. ''' model = self.view.model() sel = self.view.selectionModel() cidx = sel.currentIndex() # TODO: get rid of this hard coded column -> 1 # and use the ``CompleterView`` schema/settings # to figure out the desired field(s) # https://doc.qt.io/qt-5/qstandarditemmodel.html#itemFromIndex node = model.itemFromIndex(cidx.siblingAtColumn(1)) if node: symbol = node.text() try: provider = node.parent().text() except AttributeError: # no text set return None # TODO: move this to somewhere non-search machinery specific? if provider == 'cache': symbol, _, provider = symbol.rpartition('.') return ( cidx, provider, symbol, ) else: return None async def chart_current_item( self, clear_to_cache: bool = True, ) -> Optional[str]: ''' Attempt to load and switch the current selected completion result to the affiliated chart app. Return any loaded symbol. ''' value = self.get_current_item() if value is None: return None cidx, provider, symbol = value godw = self.godwidget fqsn = f'{symbol}.{provider}' log.info(f'Requesting symbol: {fqsn}') # assert provider in symbol await godw.load_symbols( fqsns=[fqsn], loglevel='info', ) # fully qualified symbol name (SNS i guess is what we're # making?) fqsn = '.'.join([symbol, provider]).lower() if clear_to_cache: self.bar.clear() # Re-order the symbol cache on the chart to display in # LIFO order. this is normally only done internally by # the chart on new symbols being loaded into memory godw.set_chart_symbols( (fqsn,), ( godw.hist_linked, godw.rt_linked, ) ) self.show_cache_entries( only=True, ) self.bar.focus() return fqsn def space_dims(self) -> tuple[float, float]: ''' Compute and return the "available space dimentions" for this search widget in terms of px space for results by return the pair of width and height. ''' # XXX: dun need dis rite? # win = self.window() # win_h = win.height() # sb_h = win.statusBar().height() godw = self.godwidget hl = godw.hist_linked edit_h = self.bar.height() h = hl.height() - edit_h w = hl.width() return w, h def on_resize(self) -> None: ''' Resize relay event from god, resize all child widgets. Right now this is just view to contents and/or the fast chart height. ''' w, h = self.space_dims() self.bar.view.show_matches(wh=(w, h)) _search_active: trio.Event = trio.Event() _search_enabled: bool = False async def pack_matches( view: CompleterView, has_results: dict[str, set[str]], matches: dict[(str, str), list[str]], provider: str, pattern: str, search: Callable[..., Awaitable[dict]], task_status: TaskStatus[ trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: log.info(f'Searching {provider} for "{pattern}"') if provider != 'cache': # insert provider entries with search status view.set_section_entries( section=provider, values=[], ) view.clear_section(provider, status_field='-> searchin..') else: # for the cache just clear it's entries and don't put a status view.clear_section(provider) with trio.CancelScope() as cs: task_status.started(cs) # ensure ^ status is updated results = list(await search(pattern)) # XXX: don't cache the cache results xD if provider != 'cache': matches[(provider, pattern)] = results # print(f'results from {provider}: {results}') has_results[pattern].add(provider) if results: # display completion results view.set_section_entries( section=provider, values=results, ) else: view.clear_section(provider) async def fill_results( search: SearchBar, recv_chan: trio.abc.ReceiveChannel, # kb debouncing pauses (bracket defaults) min_pause_time: float = 0.01, # absolute min typing throttle # max pause required before slow relay max_pause_time: float = 6/16 + 0.001, ) -> None: ''' Task to search through providers and fill in possible completion results. ''' global _search_active, _search_enabled, _searcher_cache bar = search.bar view = bar.view view.select_from_idx(QModelIndex()) last_text = bar.text() repeats = 0 # cache of prior patterns to search results matches = defaultdict(list) has_results: defaultdict[str, set[str]] = defaultdict(set) # show cached feed list at startup search.show_cache_entries() search.on_resize() while True: await _search_active.wait() period = None while True: last_text = bar.text() wait_start = time.time() with trio.move_on_after(max_pause_time): pattern = await recv_chan.receive() period = time.time() - wait_start log.debug(f'{pattern} after {period}') # during fast multiple key inputs, wait until a pause # (in typing) to initiate search if period < min_pause_time: log.debug(f'Ignoring fast input for {pattern}') continue text = bar.text() # print(f'search: {text}') if not text or text.isspace(): # print('idling') _search_active = trio.Event() break if text == last_text: repeats += 1 if not _search_enabled: # print('search currently disabled') break already_has_results = has_results[text] log.debug(f'Search req for {text}') # issue multi-provider fan-out search request and place # "searching.." statuses on outstanding results providers async with trio.open_nursery() as n: for provider, (search, pause) in ( _searcher_cache.copy().items() ): # XXX: only conduct search on this backend if it's # registered for the corresponding pause period AND # it hasn't already been searched with the current # input pattern (in which case just look up the old # results). if ( period >= pause and provider not in already_has_results ): # TODO: it may make more sense TO NOT search the # cache in a bg task since we know it's fully # cpu-bound. if provider != 'cache': view.clear_section( provider, status_field='-> searchin..', ) await n.start( pack_matches, view, has_results, matches, provider, text, search ) else: # already has results for this input text results = matches[(provider, text)] # TODO really for the cache we need an # invalidation signal so that we only re-search # the cache once it's been mutated by the chart # switcher.. right now we're just always # re-searching it's ``dict`` since it's easier # but it also causes it to be slower then cached # results from other providers on occasion. if ( results ): if provider != 'cache': view.set_section_entries( section=provider, values=results, ) else: # if provider == 'cache': # for the cache just show what we got # that matches search.show_cache_entries() else: view.clear_section(provider) if repeats > 2 and period > max_pause_time: _search_active = trio.Event() repeats = 0 break bar.show() async def handle_keyboard_input( searchbar: SearchBar, recv_chan: trio.abc.ReceiveChannel, ) -> None: global _search_active, _search_enabled # startup searchw = searchbar.parent() godwidget = searchw.godwidget view = searchbar.view view.set_font_size(searchbar.dpi_font.px_size) send, recv = trio.open_memory_channel(616) async with trio.open_nursery() as n: # start a background multi-searcher task which receives # patterns relayed from this keyboard input handler and # async updates the completer view's results. n.start_soon( partial( fill_results, searchw, recv, ) ) searchbar.focus() searchw.show_cache_entries() await trio.sleep(0) async for kbmsg in recv_chan: event, etype, key, mods, txt = kbmsg.to_tuple() log.debug(f'key: {key}, mods: {mods}, txt: {txt}') ctl = False if mods == Qt.ControlModifier: ctl = True if key in ( Qt.Key_Enter, Qt.Key_Return ): _search_enabled = False await searchw.chart_current_item(clear_to_cache=True) # XXX: causes hang and segfault.. # searchw.show_cache_entries( # only=True, # keep_current_item_selected=True, # ) view.show_matches() searchw.focus() elif ( not ctl and not searchbar.text() ): # TODO: really should factor this somewhere..bc # we're doin it in another spot as well.. searchw.show_cache_entries(only=True) continue # cancel and close if ctl and key in { Qt.Key_C, Qt.Key_Space, # i feel like this is the "native" one Qt.Key_Alt, }: searchbar.unfocus() # kill the search and focus back on main chart if godwidget: godwidget.focus() continue if ( ctl and key in {Qt.Key_L} ): # like url (link) highlight in a web browser searchbar.focus() # selection navigation controls elif ( ctl and key in {Qt.Key_D} ): view.next_section(direction='down') _search_enabled = False elif ( ctl and key in {Qt.Key_U} ): view.next_section(direction='up') _search_enabled = False # selection navigation controls elif ( ctl and ( key in { Qt.Key_K, Qt.Key_J, } or key in { Qt.Key_Up, Qt.Key_Down, } ) ): _search_enabled = False if key in { Qt.Key_K, Qt.Key_Up }: item = view.select_previous() elif key in { Qt.Key_J, Qt.Key_Down, }: item = view.select_next() if item: parent_item = item.parent() # if we're in the cache section and thus the next # selection is a cache item, switch and show it # immediately since it should be very fast. if ( parent_item and parent_item.text() == 'cache' ): await searchw.chart_current_item(clear_to_cache=False) # ACTUAL SEARCH BLOCK # # where we fuzzy complete and fill out sections. elif not ctl: # relay to completer task _search_enabled = True send.send_nowait(searchw.bar.text()) _search_active.set() async def search_simple_dict( text: str, source: dict, ) -> dict[str, Any]: tokens = [] for key in source: if not isinstance(key, str): tokens.extend(key) else: tokens.append(key) # search routine can be specified as a function such # as in the case of the current app's local symbol cache matches = fuzzy.extractBests( text, tokens, score_cutoff=90, ) return [item[0] for item in matches] # cache of provider names to async search routines _searcher_cache: dict[str, Callable[..., Awaitable]] = {} @asynccontextmanager async def register_symbol_search( provider_name: str, search_routine: Callable, pause_period: Optional[float] = None, ) -> AsyncIterator[dict]: global _searcher_cache pause_period = pause_period or 0.1 # deliver search func to consumer try: _searcher_cache[provider_name] = (search_routine, pause_period) yield search_routine finally: _searcher_cache.pop(provider_name)