Extend Qt event relaying
Add an `open_handler()` ctx manager for wholesale handling event sets with a passed in async func. Better document and implement the event filtering core including adding support for key "auto repeat" filtering; it turns out the events delivered when `trio` does its guest-most tick are not the same (Qt has somehow consumed them or something) so we have to do certain things (like getting the `.type()`, `.isAutoRepeat()`, etc.) before shipping over the mem chan. The alt might be to copy the event objects first but haven't tried it yet. For now just offer auto-repeat filtering through a flag.asyncify_input_modes
parent
e4518c59af
commit
85621af8af
|
@ -19,27 +19,41 @@ Qt event proxying and processing using ``trio`` mem chans.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
from PyQt5 import QtCore, QtGui
|
from PyQt5 import QtCore
|
||||||
from PyQt5.QtCore import QEvent
|
from PyQt5.QtCore import QEvent
|
||||||
|
from PyQt5.QtGui import QWidget
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
class EventCloner(QtCore.QObject):
|
class EventRelay(QtCore.QObject):
|
||||||
"""Clone and forward keyboard events over a trio memory channel
|
'''
|
||||||
for later async processing.
|
Relay Qt events over a trio memory channel for async processing.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
_event_types: set[QEvent] = set()
|
_event_types: set[QEvent] = set()
|
||||||
_send_chan: trio.abc.SendChannel = None
|
_send_chan: trio.abc.SendChannel = None
|
||||||
|
_filter_auto_repeats: bool = True
|
||||||
|
|
||||||
def eventFilter(
|
def eventFilter(
|
||||||
self,
|
self,
|
||||||
source: QtGui.QWidget,
|
source: QWidget,
|
||||||
ev: QEvent,
|
ev: QEvent,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
Qt global event filter: return `False` to pass through and `True`
|
||||||
|
to filter event out.
|
||||||
|
|
||||||
if ev.type() in self._event_types:
|
https://doc.qt.io/qt-5/qobject.html#eventFilter
|
||||||
|
https://doc.qt.io/qtforpython/overviews/eventsandfilters.html#event-filters
|
||||||
|
|
||||||
|
'''
|
||||||
|
etype = ev.type()
|
||||||
|
# print(f'etype: {etype}')
|
||||||
|
|
||||||
|
if etype in self._event_types:
|
||||||
|
# ev.accept()
|
||||||
|
|
||||||
# TODO: what's the right way to allow this?
|
# TODO: what's the right way to allow this?
|
||||||
# if ev.isAutoRepeat():
|
# if ev.isAutoRepeat():
|
||||||
|
@ -51,35 +65,54 @@ class EventCloner(QtCore.QObject):
|
||||||
# something to do with Qt internals and calling the
|
# something to do with Qt internals and calling the
|
||||||
# parent handler?
|
# parent handler?
|
||||||
|
|
||||||
|
if etype in {QEvent.KeyPress, QEvent.KeyRelease}:
|
||||||
|
|
||||||
|
# TODO: is there a global setting for this?
|
||||||
|
if ev.isAutoRepeat() and self._filter_auto_repeats:
|
||||||
|
ev.ignore()
|
||||||
|
return True
|
||||||
|
|
||||||
key = ev.key()
|
key = ev.key()
|
||||||
mods = ev.modifiers()
|
mods = ev.modifiers()
|
||||||
txt = ev.text()
|
txt = ev.text()
|
||||||
|
|
||||||
# run async processing
|
# NOTE: the event object instance coming out
|
||||||
self._send_chan.send_nowait((ev, key, mods, txt))
|
# the other side is mutated since Qt resumes event
|
||||||
|
# processing **before** running a ``trio`` guest mode
|
||||||
|
# tick, thus special handling or copying must be done.
|
||||||
|
|
||||||
# never intercept the event
|
# send elements to async handler
|
||||||
|
self._send_chan.send_nowait((ev, etype, key, mods, txt))
|
||||||
|
|
||||||
|
else:
|
||||||
|
# send event to async handler
|
||||||
|
self._send_chan.send_nowait(ev)
|
||||||
|
|
||||||
|
# **do not** filter out this event
|
||||||
|
# and instead forward to the source widget
|
||||||
|
return False
|
||||||
|
|
||||||
|
# filter out this event
|
||||||
|
# https://doc.qt.io/qt-5/qobject.html#installEventFilter
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_key_stream(
|
async def open_event_stream(
|
||||||
|
|
||||||
source_widget: QtGui.QWidget,
|
source_widget: QWidget,
|
||||||
event_types: set[QEvent] = {QEvent.KeyPress},
|
event_types: set[QEvent] = {QEvent.KeyPress},
|
||||||
|
filter_auto_repeats: bool = True,
|
||||||
# TODO: should we offer some kinda option for toggling releases?
|
|
||||||
# would it require a channel per event type?
|
|
||||||
# QEvent.KeyRelease,
|
|
||||||
|
|
||||||
) -> trio.abc.ReceiveChannel:
|
) -> trio.abc.ReceiveChannel:
|
||||||
|
|
||||||
# 1 to force eager sending
|
# 1 to force eager sending
|
||||||
send, recv = trio.open_memory_channel(16)
|
send, recv = trio.open_memory_channel(16)
|
||||||
|
|
||||||
kc = EventCloner()
|
kc = EventRelay()
|
||||||
kc._send_chan = send
|
kc._send_chan = send
|
||||||
kc._event_types = event_types
|
kc._event_types = event_types
|
||||||
|
kc._filter_auto_repeats = filter_auto_repeats
|
||||||
|
|
||||||
source_widget.installEventFilter(kc)
|
source_widget.installEventFilter(kc)
|
||||||
|
|
||||||
|
@ -89,3 +122,22 @@ async def open_key_stream(
|
||||||
finally:
|
finally:
|
||||||
await send.aclose()
|
await send.aclose()
|
||||||
source_widget.removeEventFilter(kc)
|
source_widget.removeEventFilter(kc)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def open_handler(
|
||||||
|
|
||||||
|
source_widget: QWidget,
|
||||||
|
event_types: set[QEvent],
|
||||||
|
async_handler: Callable[[QWidget, trio.abc.ReceiveChannel], None],
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
async with (
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
open_event_stream(source_widget, event_types, **kwargs) as event_recv_stream,
|
||||||
|
):
|
||||||
|
n.start_soon(async_handler, source_widget, event_recv_stream)
|
||||||
|
yield
|
||||||
|
n.cancel_scope.cancel()
|
||||||
|
|
Loading…
Reference in New Issue