piker/piker/ui/_search.py

946 lines
27 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
qompleterz: embeddable search and complete using trio, Qt and fuzzywuzzy.
"""
# link set for hackzin on this stuff:
# https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections
# https://doc.qt.io/qt-5/model-view-programming.html
# https://doc.qt.io/qt-5/modelview.html
# https://doc.qt.io/qt-5/qtreeview.html#selectedIndexes
# https://doc.qt.io/qt-5/qmodelindex.html#siblingAtColumn
# https://doc.qt.io/qt-5/qitemselectionmodel.html#currentIndex
# https://www.programcreek.com/python/example/108109/PyQt5.QtWidgets.QTreeView
# https://doc.qt.io/qt-5/qsyntaxhighlighter.html
# https://github.com/qutebrowser/qutebrowser/blob/master/qutebrowser/completion/completiondelegate.py#L243
# https://forum.qt.io/topic/61343/highlight-matched-substrings-in-qstyleditemdelegate
from collections import defaultdict
from contextlib import asynccontextmanager
from functools import partial
from typing import (
Optional, Callable,
Awaitable, Sequence,
Any, AsyncIterator
)
import time
# from pprint import pformat
from fuzzywuzzy import process as fuzzy
import trio
from trio_typing import TaskStatus
from PyQt5 import QtCore
from PyQt5 import QtWidgets
from PyQt5.QtCore import (
Qt,
# QSize,
QModelIndex,
QItemSelectionModel,
)
from PyQt5.QtGui import (
# QLayout,
QStandardItem,
QStandardItemModel,
)
from PyQt5.QtWidgets import (
QWidget,
QTreeView,
# QListWidgetItem,
# QAbstractScrollArea,
# QStyledItemDelegate,
)
from ..log import get_logger
from ._style import (
_font,
hcolor,
)
from ._forms import FontAndChartAwareLineEdit, FontScaledDelegate
log = get_logger(__name__)
class CompleterView(QTreeView):
mode_name: str = 'search-nav'
# XXX: relevant docs links:
# - simple widget version of this:
# https://doc.qt.io/qt-5/qtreewidget.html#details
# - MVC high level instructional:
# https://doc.qt.io/qt-5/model-view-programming.html
# - MV tut:
# https://doc.qt.io/qt-5/modelview.html
# - custome header view (for doing stuff like we have in kivy?):
# https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections
# TODO: selection model stuff for eventual aggregate feeds
# charting and mgmt;
# https://doc.qt.io/qt-5/qabstractitemview.html#setSelectionModel
# https://doc.qt.io/qt-5/qitemselectionmodel.html
# https://doc.qt.io/qt-5/modelview.html#3-2-working-with-selections
# https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-of-items
# TODO: mouse extended handling:
# https://doc.qt.io/qt-5/qabstractitemview.html#entered
def __init__(
self,
parent=None,
labels: list[str] = [],
) -> None:
super().__init__(parent)
model = QStandardItemModel(self)
self.labels = labels
# a std "tabular" config
self.setItemDelegate(FontScaledDelegate(self))
self.setModel(model)
self.setAlternatingRowColors(True)
# TODO: size this based on DPI font
self.setIndentation(20)
# self.setUniformRowHeights(True)
# self.setColumnWidth(0, 3)
# self.setVerticalBarPolicy(Qt.ScrollBarAlwaysOff)
# self.setSizeAdjustPolicy(QAbstractScrollArea.AdjustIgnored)
# ux settings
self.setItemsExpandable(True)
self.setExpandsOnDoubleClick(False)
self.setAnimated(False)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
# column headers
model.setHorizontalHeaderLabels(labels)
self._font_size: int = 0 # pixels
async def on_pressed(self, idx: QModelIndex) -> None:
'''Mouse pressed on view handler.
'''
search = self.parent()
await search.chart_current_item(clear_to_cache=False)
search.focus()
def set_font_size(self, size: int = 18):
# print(size)
if size < 0:
size = 16
self._font_size = size
self.setStyleSheet(f"font: {size}px")
def resize(self):
model = self.model()
cols = model.columnCount()
for i in range(cols):
self.resizeColumnToContents(i)
# inclusive of search bar and header "rows" in pixel terms
rows = 100
# max_rows = 8 # 6 + search and headers
row_px = self.rowHeight(self.currentIndex())
# print(f'font_h: {font_h}\n px_height: {px_height}')
# TODO: probably make this more general / less hacky
self.setMinimumSize(self.width(), rows * row_px)
self.setMaximumSize(self.width() + 10, rows * row_px)
self.setFixedWidth(333)
def is_selecting_d1(self) -> bool:
cidx = self.selectionModel().currentIndex()
return cidx.parent() == QModelIndex()
def previous_index(self) -> QModelIndex:
cidx = self.selectionModel().currentIndex()
one_above = self.indexAbove(cidx)
if one_above.parent() == QModelIndex():
# if the next node up's parent is the root we don't want to select
# the next node up since it's a top level node and we only
# select entries depth >= 2.
# see if one more up is not the root and we can select it.
two_above = self.indexAbove(one_above)
if two_above != QModelIndex():
return two_above
else:
return cidx
return one_above # just next up
def next_index(self) -> QModelIndex:
cidx = self.selectionModel().currentIndex()
one_below = self.indexBelow(cidx)
if one_below.parent() == QModelIndex():
# if the next node up's parent is the root we don't want to select
# the next node up since it's a top level node and we only
# select entries depth >= 2.
# see if one more up is not the root and we can select it.
two_below = self.indexBelow(one_below)
if two_below != QModelIndex():
return two_below
else:
return cidx
return one_below # just next down
def select_from_idx(
self,
idx: QModelIndex,
) -> QStandardItem:
'''Select and return the item at index ``idx``.
'''
sel = self.selectionModel()
model = self.model()
sel.setCurrentIndex(
idx,
QItemSelectionModel.ClearAndSelect |
QItemSelectionModel.Rows
)
return model.itemFromIndex(idx)
def select_first(self) -> QStandardItem:
'''Select the first depth >= 2 entry from the completer tree and
return it's item.
'''
# ensure we're **not** selecting the first level parent node and
# instead its child.
model = self.model()
for idx, item in self.iter_d1():
if model.rowCount(idx) == 0:
continue
else:
return self.select_from_idx(self.indexBelow(idx))
def select_next(self) -> QStandardItem:
idx = self.next_index()
assert idx.isValid()
return self.select_from_idx(idx)
def select_previous(self) -> QStandardItem:
idx = self.previous_index()
assert idx.isValid()
return self.select_from_idx(idx)
def next_section(self, direction: str = 'down') -> QModelIndex:
cidx = start_idx = self.selectionModel().currentIndex()
# step up levels to depth == 1
while cidx.parent() != QModelIndex():
cidx = cidx.parent()
# move to next section in `direction`
op = {'up': -1, 'down': +1}[direction]
next_row = cidx.row() + op
nidx = self.model().index(next_row, cidx.column(), QModelIndex())
# do nothing, if there is no valid "next" section
if not nidx.isValid():
return self.select_from_idx(start_idx)
# go to next selectable child item
self.select_from_idx(nidx)
return self.select_next()
def iter_d1(
self,
) -> tuple[QModelIndex, QStandardItem]:
model = self.model()
isections = model.rowCount()
# much thanks to following code to figure out breadth-first
# traversing from the root node:
# https://stackoverflow.com/a/33126689
for i in range(isections):
idx = model.index(i, 0, QModelIndex())
item = model.itemFromIndex(idx)
yield idx, item
def find_section(
self,
section: str,
) -> Optional[QModelIndex]:
'''Find the *first* depth = 1 section matching ``section`` in
the tree and return its index.
'''
for idx, item in self.iter_d1():
if item.text() == section:
return idx
else:
# caller must expect his
return None
def clear_section(
self,
section: str,
status_field: str = None,
) -> None:
'''Clear all result-rows from under the depth = 1 section.
'''
idx = self.find_section(section)
model = self.model()
if idx is not None:
if model.hasChildren(idx):
rows = model.rowCount(idx)
# print(f'removing {rows} from {section}')
assert model.removeRows(0, rows, parent=idx)
# remove section as well ?
# model.removeRow(i, QModelIndex())
if status_field is not None:
model.setItem(idx.row(), 1, QStandardItem(status_field))
else:
model.setItem(idx.row(), 1, QStandardItem())
self.resize()
return idx
else:
return None
def set_section_entries(
self,
section: str,
values: Sequence[str],
clear_all: bool = False,
) -> None:
'''Set result-rows for depth = 1 tree section ``section``.
'''
model = self.model()
if clear_all:
# XXX: rewrite the model from scratch if caller requests it
model.clear()
model.setHorizontalHeaderLabels(self.labels)
section_idx = self.clear_section(section)
# if we can't find a section start adding to the root
if section_idx is None:
root = model.invisibleRootItem()
section_item = QStandardItem(section)
blank = QStandardItem('')
root.appendRow([section_item, blank])
else:
section_item = model.itemFromIndex(section_idx)
# values just needs to be sequence-like
for i, s in enumerate(values):
ix = QStandardItem(str(i))
item = QStandardItem(s)
# Add the item to the model
section_item.appendRow([ix, item])
self.expandAll()
# TODO: figure out if we can avoid this line in a better way
# such that "re-selection" doesn't happen tree-wise for each new
# sub-search:
# https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-in-item-views
# XXX: THE BELOW LINE MUST BE CALLED.
# this stuff is super finicky and if not done right will cause
# Qt crashes out our buttz. it's required in order to get the
# view to show right after typing input.
self.select_first()
# TODO: the way we might be able to do this more sanely is,
# 1. for the currently selected item, when start rewriting
# a section figure out the row, column, parent "abstract"
# position in the tree view and store it
# 2. take that position and re-apply the selection to the new
# model/tree by looking up the new "equivalent element" and
# selecting
self.show_matches()
def show_matches(self) -> None:
self.show()
self.resize()
class SearchBar(FontAndChartAwareLineEdit):
mode_name: str = 'search'
def __init__(
self,
parent: QWidget,
godwidget: QWidget,
view: Optional[CompleterView] = None,
**kwargs,
) -> None:
self.godwidget = godwidget
super().__init__(parent, **kwargs)
self.view: CompleterView = view
def show(self) -> None:
super().show()
self.view.show_matches()
def unfocus(self) -> None:
self.parent().hide()
self.clearFocus()
if self.view:
self.view.hide()
class SearchWidget(QtWidgets.QWidget):
'''Composed widget of ``SearchBar`` + ``CompleterView``.
Includes helper methods for item management in the sub-widgets.
'''
mode_name: str = 'search'
def __init__(
self,
godwidget: 'GodWidget', # type: ignore # noqa
columns: list[str] = ['src', 'symbol'],
parent=None,
) -> None:
super().__init__(parent or godwidget)
# size it as we specify
self.setSizePolicy(
QtWidgets.QSizePolicy.Fixed,
QtWidgets.QSizePolicy.Fixed,
)
self.godwidget = godwidget
self.vbox = QtWidgets.QVBoxLayout(self)
self.vbox.setContentsMargins(0, 4, 4, 0)
self.vbox.setSpacing(4)
# split layout for the (label:| search bar entry)
self.bar_hbox = QtWidgets.QHBoxLayout()
self.bar_hbox.setContentsMargins(0, 0, 0, 0)
self.bar_hbox.setSpacing(4)
# add label to left of search bar
self.label = label = QtWidgets.QLabel(parent=self)
label.setStyleSheet(
f"""QLabel {{
color : {hcolor('default_lightest')};
font-size : {_font.px_size - 2}px;
}}
"""
)
label.setTextFormat(3) # markdown
label.setFont(_font.font)
label.setMargin(4)
label.setText("search:")
label.show()
label.setAlignment(
QtCore.Qt.AlignVCenter
| QtCore.Qt.AlignLeft
)
self.bar_hbox.addWidget(label)
self.view = CompleterView(
parent=self,
labels=columns,
)
self.bar = SearchBar(
parent=self,
view=self.view,
godwidget=godwidget,
)
self.bar_hbox.addWidget(self.bar)
self.vbox.addLayout(self.bar_hbox)
self.vbox.setAlignment(self.bar, Qt.AlignTop | Qt.AlignRight)
self.vbox.addWidget(self.bar.view)
self.vbox.setAlignment(self.view, Qt.AlignTop | Qt.AlignLeft)
def focus(self) -> None:
if self.view.model().rowCount(QModelIndex()) == 0:
# fill cache list if nothing existing
self.view.set_section_entries(
'cache',
list(reversed(self.godwidget._chart_cache)),
clear_all=True,
)
self.bar.focus()
self.show()
def get_current_item(self) -> Optional[tuple[str, str]]:
'''Return the current completer tree selection as
a tuple ``(parent: str, child: str)`` if valid, else ``None``.
'''
model = self.view.model()
sel = self.view.selectionModel()
cidx = sel.currentIndex()
# TODO: get rid of this hard coded column -> 1
# and use the ``CompleterView`` schema/settings
# to figure out the desired field(s)
# https://doc.qt.io/qt-5/qstandarditemmodel.html#itemFromIndex
node = model.itemFromIndex(cidx.siblingAtColumn(1))
if node:
symbol = node.text()
try:
provider = node.parent().text()
except AttributeError:
# no text set
return None
# TODO: move this to somewhere non-search machinery specific?
if provider == 'cache':
symbol, _, provider = symbol.rpartition('.')
return provider, symbol
else:
return None
async def chart_current_item(
self,
clear_to_cache: bool = True,
) -> Optional[str]:
'''Attempt to load and switch the current selected
completion result to the affiliated chart app.
Return any loaded symbol.
'''
value = self.get_current_item()
if value is None:
return None
provider, symbol = value
chart = self.godwidget
log.info(f'Requesting symbol: {symbol}.{provider}')
await chart.load_symbol(
provider,
symbol,
'info',
)
# fully qualified symbol name (SNS i guess is what we're
# making?)
fqsn = '.'.join([symbol, provider]).lower()
if clear_to_cache:
self.bar.clear()
# Re-order the symbol cache on the chart to display in
# LIFO order. this is normally only done internally by
# the chart on new symbols being loaded into memory
chart.set_chart_symbol(fqsn, chart.linkedsplits)
self.view.set_section_entries(
'cache',
values=list(reversed(chart._chart_cache)),
# remove all other completion results except for cache
clear_all=True,
)
return fqsn
_search_active: trio.Event = trio.Event()
_search_enabled: bool = False
async def pack_matches(
view: CompleterView,
has_results: dict[str, set[str]],
matches: dict[(str, str), list[str]],
provider: str,
pattern: str,
search: Callable[..., Awaitable[dict]],
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
log.info(f'Searching {provider} for "{pattern}"')
if provider != 'cache':
# insert provider entries with search status
view.set_section_entries(
section=provider,
values=[],
)
view.clear_section(provider, status_field='-> searchin..')
else: # for the cache just clear it's entries and don't put a status
view.clear_section(provider)
with trio.CancelScope() as cs:
task_status.started(cs)
# ensure ^ status is updated
results = await search(pattern)
if provider != 'cache': # XXX: don't cache the cache results xD
matches[(provider, pattern)] = results
# print(f'results from {provider}: {results}')
has_results[pattern].add(provider)
if results:
# display completion results
view.set_section_entries(
section=provider,
values=results,
)
else:
view.clear_section(provider)
async def fill_results(
search: SearchBar,
recv_chan: trio.abc.ReceiveChannel,
# kb debouncing pauses (bracket defaults)
min_pause_time: float = 0.01, # absolute min typing throttle
# max pause required before slow relay
max_pause_time: float = 6/16 + 0.001,
) -> None:
"""Task to search through providers and fill in possible
completion results.
"""
global _search_active, _search_enabled, _searcher_cache
bar = search.bar
view = bar.view
view.select_from_idx(QModelIndex())
last_text = bar.text()
repeats = 0
# cache of prior patterns to search results
matches = defaultdict(list)
has_results: defaultdict[str, set[str]] = defaultdict(set)
while True:
await _search_active.wait()
period = None
while True:
last_text = bar.text()
wait_start = time.time()
with trio.move_on_after(max_pause_time):
pattern = await recv_chan.receive()
period = time.time() - wait_start
print(f'{pattern} after {period}')
# during fast multiple key inputs, wait until a pause
# (in typing) to initiate search
if period < min_pause_time:
log.debug(f'Ignoring fast input for {pattern}')
continue
text = bar.text()
# print(f'search: {text}')
if not text or text.isspace():
# print('idling')
_search_active = trio.Event()
break
if text == last_text:
repeats += 1
if not _search_enabled:
# print('search currently disabled')
break
already_has_results = has_results[text]
log.debug(f'Search req for {text}')
# issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers
async with trio.open_nursery() as n:
for provider, (search, pause) in (
_searcher_cache.copy().items()
):
# XXX: only conduct search on this backend if it's
# registered for the corresponding pause period AND
# it hasn't already been searched with the current
# input pattern (in which case just look up the old
# results).
if (period >= pause) and (
provider not in already_has_results
):
# TODO: it may make more sense TO NOT search the
# cache in a bg task since we know it's fully
# cpu-bound.
if provider != 'cache':
view.clear_section(
provider, status_field='-> searchin..')
await n.start(
pack_matches,
view,
has_results,
matches,
provider,
text,
search
)
else: # already has results for this input text
results = matches[(provider, text)]
# TODO really for the cache we need an
# invalidation signal so that we only re-search
# the cache once it's been mutated by the chart
# switcher.. right now we're just always
# re-searching it's ``dict`` since it's easier
# but it also causes it to be slower then cached
# results from other providers on occasion.
if results and provider != 'cache':
view.set_section_entries(
section=provider,
values=results,
)
else:
view.clear_section(provider)
if repeats > 2 and period > max_pause_time:
_search_active = trio.Event()
repeats = 0
break
bar.show()
async def handle_keyboard_input(
searchbar: SearchBar,
recv_chan: trio.abc.ReceiveChannel,
) -> None:
global _search_active, _search_enabled
# startup
bar = searchbar
search = searchbar.parent()
godwidget = search.godwidget
view = bar.view
view.set_font_size(bar.dpi_font.px_size)
send, recv = trio.open_memory_channel(16)
async with trio.open_nursery() as n:
# start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and
# async updates the completer view's results.
n.start_soon(
partial(
fill_results,
search,
recv,
)
)
async for kbmsg in recv_chan:
event, etype, key, mods, txt = kbmsg.to_tuple()
log.debug(f'key: {key}, mods: {mods}, txt: {txt}')
ctl = False
if mods == Qt.ControlModifier:
ctl = True
if key in (Qt.Key_Enter, Qt.Key_Return):
await search.chart_current_item(clear_to_cache=True)
_search_enabled = False
continue
elif not ctl and not bar.text():
# if nothing in search text show the cache
view.set_section_entries(
'cache',
list(reversed(godwidget._chart_cache)),
clear_all=True,
)
continue
# cancel and close
if ctl and key in {
Qt.Key_C,
Qt.Key_Space, # i feel like this is the "native" one
Qt.Key_Alt,
}:
search.bar.unfocus()
# kill the search and focus back on main chart
if godwidget:
godwidget.focus()
continue
if ctl and key in {
Qt.Key_L,
}:
# like url (link) highlight in a web browser
bar.focus()
# selection navigation controls
elif ctl and key in {
Qt.Key_D,
}:
view.next_section(direction='down')
_search_enabled = False
elif ctl and key in {
Qt.Key_U,
}:
view.next_section(direction='up')
_search_enabled = False
# selection navigation controls
elif (ctl and key in {
Qt.Key_K,
Qt.Key_J,
}) or key in {
Qt.Key_Up,
Qt.Key_Down,
}:
_search_enabled = False
if key in {Qt.Key_K, Qt.Key_Up}:
item = view.select_previous()
elif key in {Qt.Key_J, Qt.Key_Down}:
item = view.select_next()
if item:
parent_item = item.parent()
if parent_item and parent_item.text() == 'cache':
# if it's a cache item, switch and show it immediately
await search.chart_current_item(clear_to_cache=False)
elif not ctl:
# relay to completer task
_search_enabled = True
send.send_nowait(search.bar.text())
_search_active.set()
async def search_simple_dict(
text: str,
source: dict,
) -> dict[str, Any]:
# search routine can be specified as a function such
# as in the case of the current app's local symbol cache
matches = fuzzy.extractBests(
text,
source.keys(),
score_cutoff=90,
)
return [item[0] for item in matches]
# cache of provider names to async search routines
_searcher_cache: dict[str, Callable[..., Awaitable]] = {}
@asynccontextmanager
async def register_symbol_search(
provider_name: str,
search_routine: Callable,
pause_period: Optional[float] = None,
) -> AsyncIterator[dict]:
global _searcher_cache
pause_period = pause_period or 0.1
# deliver search func to consumer
try:
_searcher_cache[provider_name] = (search_routine, pause_period)
yield search_routine
finally:
_searcher_cache.pop(provider_name)