Extend Qt event relaying
Add an `open_handler()` ctx manager for wholesale handling event sets with a passed in async func. Better document and implement the event filtering core including adding support for key "auto repeat" filtering; it turns out the events delivered when `trio` does its guest-most tick are not the same (Qt has somehow consumed them or something) so we have to do certain things (like getting the `.type()`, `.isAutoRepeat()`, etc.) before shipping over the mem chan. The alt might be to copy the event objects first but haven't tried it yet. For now just offer auto-repeat filtering through a flag.backup_asyncify_input_modes
parent
df2f6487ff
commit
4759e79d3d
|
@ -19,27 +19,41 @@ Qt event proxying and processing using ``trio`` mem chans.
|
|||
|
||||
"""
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Callable
|
||||
|
||||
from PyQt5 import QtCore, QtGui
|
||||
from PyQt5 import QtCore
|
||||
from PyQt5.QtCore import QEvent
|
||||
from PyQt5.QtGui import QWidget
|
||||
import trio
|
||||
|
||||
|
||||
class EventCloner(QtCore.QObject):
|
||||
"""Clone and forward keyboard events over a trio memory channel
|
||||
for later async processing.
|
||||
class EventRelay(QtCore.QObject):
|
||||
'''
|
||||
Relay Qt events over a trio memory channel for async processing.
|
||||
|
||||
"""
|
||||
'''
|
||||
_event_types: set[QEvent] = set()
|
||||
_send_chan: trio.abc.SendChannel = None
|
||||
_filter_auto_repeats: bool = True
|
||||
|
||||
def eventFilter(
|
||||
self,
|
||||
source: QtGui.QWidget,
|
||||
source: QWidget,
|
||||
ev: QEvent,
|
||||
) -> None:
|
||||
'''
|
||||
Qt global event filter: return `False` to pass through and `True`
|
||||
to filter event out.
|
||||
|
||||
if ev.type() in self._event_types:
|
||||
https://doc.qt.io/qt-5/qobject.html#eventFilter
|
||||
https://doc.qt.io/qtforpython/overviews/eventsandfilters.html#event-filters
|
||||
|
||||
'''
|
||||
etype = ev.type()
|
||||
# print(f'etype: {etype}')
|
||||
|
||||
if etype in self._event_types:
|
||||
# ev.accept()
|
||||
|
||||
# TODO: what's the right way to allow this?
|
||||
# if ev.isAutoRepeat():
|
||||
|
@ -51,35 +65,54 @@ class EventCloner(QtCore.QObject):
|
|||
# something to do with Qt internals and calling the
|
||||
# parent handler?
|
||||
|
||||
key = ev.key()
|
||||
mods = ev.modifiers()
|
||||
txt = ev.text()
|
||||
if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
|
||||
|
||||
# run async processing
|
||||
self._send_chan.send_nowait((ev, key, mods, txt))
|
||||
# TODO: is there a global setting for this?
|
||||
if ev.isAutoRepeat() and self._filter_auto_repeats:
|
||||
ev.ignore()
|
||||
return True
|
||||
|
||||
# never intercept the event
|
||||
key = ev.key()
|
||||
mods = ev.modifiers()
|
||||
txt = ev.text()
|
||||
|
||||
# NOTE: the event object instance coming out
|
||||
# the other side is mutated since Qt resumes event
|
||||
# processing **before** running a ``trio`` guest mode
|
||||
# tick, thus special handling or copying must be done.
|
||||
|
||||
# send elements to async handler
|
||||
self._send_chan.send_nowait((ev, etype, key, mods, txt))
|
||||
|
||||
else:
|
||||
# send event to async handler
|
||||
self._send_chan.send_nowait(ev)
|
||||
|
||||
# **do not** filter out this event
|
||||
# and instead forward to the source widget
|
||||
return False
|
||||
|
||||
# filter out this event
|
||||
# https://doc.qt.io/qt-5/qobject.html#installEventFilter
|
||||
return False
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_key_stream(
|
||||
async def open_event_stream(
|
||||
|
||||
source_widget: QtGui.QWidget,
|
||||
source_widget: QWidget,
|
||||
event_types: set[QEvent] = {QEvent.KeyPress},
|
||||
|
||||
# TODO: should we offer some kinda option for toggling releases?
|
||||
# would it require a channel per event type?
|
||||
# QEvent.KeyRelease,
|
||||
filter_auto_repeats: bool = True,
|
||||
|
||||
) -> trio.abc.ReceiveChannel:
|
||||
|
||||
# 1 to force eager sending
|
||||
send, recv = trio.open_memory_channel(16)
|
||||
|
||||
kc = EventCloner()
|
||||
kc = EventRelay()
|
||||
kc._send_chan = send
|
||||
kc._event_types = event_types
|
||||
kc._filter_auto_repeats = filter_auto_repeats
|
||||
|
||||
source_widget.installEventFilter(kc)
|
||||
|
||||
|
@ -89,3 +122,22 @@ async def open_key_stream(
|
|||
finally:
|
||||
await send.aclose()
|
||||
source_widget.removeEventFilter(kc)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_handler(
|
||||
|
||||
source_widget: QWidget,
|
||||
event_types: set[QEvent],
|
||||
async_handler: Callable[[QWidget, trio.abc.ReceiveChannel], None],
|
||||
**kwargs,
|
||||
|
||||
) -> None:
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_event_stream(source_widget, event_types, **kwargs) as event_recv_stream,
|
||||
):
|
||||
n.start_soon(async_handler, source_widget, event_recv_stream)
|
||||
yield
|
||||
n.cancel_scope.cancel()
|
||||
|
|
Loading…
Reference in New Issue