From a2e5d07b2c17d974fd90b72342daa94cfddd471b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 20 Nov 2019 19:38:39 -0500 Subject: [PATCH] Port to mainline kivy and Python 3.8 This required some copy-paste of code from @matham's branch: https://github.com/kivy/kivy/pull/5241 namely, the stuff in the `utils_async.py` module. I've added all that as a standalone file for now. Update the pipfile to use `kivy`'s master branch (since there seems to be some lingering cython issues in the current release wheels). --- Pipfile | 5 +- piker/ui/kivy/utils_async.py | 230 +++++++++++++++++++++++++++++++++++ piker/ui/pager.py | 8 +- 3 files changed, 238 insertions(+), 5 deletions(-) create mode 100644 piker/ui/kivy/utils_async.py diff --git a/Pipfile b/Pipfile index 04bf8dff..9c2dc6d0 100644 --- a/Pipfile +++ b/Pipfile @@ -7,8 +7,8 @@ name = "pypi" e1839a8 = {path = ".",editable = true} trio = "*" Cython = "*" -# matham's next-gen async port of kivy -Kivy = {editable = true,git = "git://github.com/matham/kivy.git",ref = "async-loop"} +# use master branch kivy since wheels seem borked (due to cython stuff) +kivy = {git = "git://github.com/kivy/kivy.git"} pdbpp = "*" msgpack = "*" tractor = {git = "git://github.com/goodboy/tractor.git"} @@ -19,3 +19,4 @@ pyside2 = "*" [dev-packages] pytest = "*" pdbpp = "*" +piker = {editable = true,path = "."} diff --git a/piker/ui/kivy/utils_async.py b/piker/ui/kivy/utils_async.py new file mode 100644 index 00000000..fd4b82da --- /dev/null +++ b/piker/ui/kivy/utils_async.py @@ -0,0 +1,230 @@ +'''Async version of :mod:`kivy.utils`. +=========================================== +''' + +import os +from collections import deque + +if os.environ.get('KIVY_EVENTLOOP', 'default') == 'trio': + import trio + async_lib = trio +else: + import asyncio + async_lib = asyncio + trio = None + + +class AsyncCallbackQueue(object): + '''A class for asynchronously iterating values in a queue and waiting + for the queue to be updated with new values through a callback function. + + An instance is an async iterator which for every iteration waits for + callbacks to add values to the queue and then returns it. + + :Parameters: + + `filter`: callable or None + A callable that is called with :meth:`callback`'s positional + arguments. When provided, if it returns false, this call is dropped. + `convert`: callable or None + A callable that is called with :meth:`callback`'s positional + arguments. It is called immediately as opposed to async. + If provided, the return value of convert is returned by + the iterator rather than the original value. Helpful + for callback values that need to be processed immediately. + `maxlen`: int or None + If None, the callback queue may grow to an arbitrary length. + Otherwise, it is bounded to maxlen. Once it's full, when new items + are added a corresponding number of oldest items are discarded. + `thread_fn`: callable or None + If reading from the queue is done with a different thread than + writing it, this is the callback that schedules in the read thread. + + .. versionadded:: 1.10.1 + ''' + + quit = False + + def __init__(self, filter=None, convert=None, maxlen=None, thread_fn=None, + **kwargs): + super(AsyncCallbackQueue, self).__init__(**kwargs) + self.filter = filter + self.convert = convert + self.callback_result = deque(maxlen=maxlen) + self.thread_fn = thread_fn + self.event = async_lib.Event() + + def __del__(self): + self.stop() + + def __aiter__(self): + return self + + async def __anext__(self): + self.event.clear() + while not self.callback_result and not self.quit: + await self.event.wait() + self.event.clear() + + if self.callback_result: + return self.callback_result.popleft() + raise StopAsyncIteration + + def _thread_reentry(self, *largs, **kwargs): + self.event.set() + + def callback(self, *args): + '''This (and only this) function may be executed from another thread + because the callback may be bound to code executing from an external + thread. + ''' + f = self.filter + if self.quit or f and not f(*args): + return + + convert = self.convert + if convert: + args = convert(*args) + + self.callback_result.append(args) + + thread_fn = self.thread_fn + if thread_fn is None: + self.event.set() + else: + thread_fn(self._thread_reentry) + + def stop(self): + '''Stops the iterator and cleans up. + ''' + self.quit = True + self.event.set() + + +def _report_kivy_back_in_trio_thread_fn(task_container): + # This function gets scheduled into the trio run loop to deliver the + # thread's result. + def do_release_then_return_result(): + # if canceled, do the cancellation otherwise the result. + if task_container[1] is not None: + task_container[1]() + return task_container[2].unwrap() + + result = trio.hazmat.Result.capture(do_release_then_return_result) + trio.hazmat.reschedule(task_container[0], result) + + +async def trio_run_in_kivy_thread(sync_fn, *args, cancellable=False): + '''When canceled, executed work is discarded. + ''' + if trio is None: + raise Exception('trio is required but was not found') + task_container = [None, ] * 4 + + def kivy_thread_callback(*largs): + # This is the function that runs in the worker thread to do the actual + # work and then schedule the calls to report_back_in_trio_thread_fn + if task_container[1] is None: + task_container[2] = trio.hazmat.Result.capture(sync_fn, *args) + + try: + task_container[3].run_sync_soon( + _report_kivy_back_in_trio_thread_fn, task_container) + except trio.RunFinishedError: + # The entire run finished, so our particular tasks are certainly + # long gone - it must have cancelled. Continue eating the queue. + raise # pass + + @trio.hazmat.enable_ki_protection + async def schedule_kivy_thread(): + await trio.hazmat.checkpoint_if_cancelled() + # Holds a reference to the task that's blocked in this function waiting + # for the result as well as to the cancel callback and the result + # (when not canceled). + task_container[0] = trio.hazmat.current_task() + task_container[3] = trio.hazmat.current_trio_token() + from kivy.clock import Clock + Clock.schedule_once(kivy_thread_callback, 0) + + def abort(raise_cancel): + if cancellable: + task_container[1] = raise_cancel + return trio.hazmat.Abort.FAILED + return await trio.hazmat.wait_task_rescheduled(abort) + + return await schedule_kivy_thread() + + +class AsyncBindQueue(AsyncCallbackQueue): + '''A class for asynchronously observing kivy properties and events. + Creates an async iterator which for every iteration waits and + returns the property or event value for every time the property changes + or the event is dispatched. + The returned value is identical to the list of values passed to a function + bound to the event or property with bind. So at minimum it's a one element + (for events) or two element (for properties, instance and value) list. + :Parameters: + `bound_obj`: :class:`EventDispatcher` + The :class:`EventDispatcher` instance that contains the property + or event being observed. + `bound_name`: str + The property or event name to observe. + `current`: bool + Whether the iterator should return the current value on its + first class (True) or wait for the first event/property dispatch + before having a value (False). Defaults to True. + E.g.:: + async for x, y in AsyncBindQueue( + bound_obj=widget, bound_name='size', convert=lambda x: x[1]): + print(value) + Or:: + async for touch in AsyncBindQueue( + bound_obj=widget, bound_name='on_touch_down', + convert=lambda x: x[0]): + print(value) + .. versionadded:: 1.10.1 + ''' + + bound_obj = None + + bound_name = '' + + bound_uid = 0 + + def __init__(self, bound_obj, bound_name, current=True, **kwargs): + super(AsyncBindQueue, self).__init__(**kwargs) + self.bound_name = bound_name + self.bound_obj = bound_obj + + uid = self.bound_uid = bound_obj.fbind(bound_name, self.callback) + if not uid: + raise ValueError( + '{} is not a recognized property or event of {}' + ''.format(bound_name, bound_obj)) + + if current and not bound_obj.is_event_type(bound_name): + args = bound_obj, getattr(bound_obj, bound_name) + + f = self.filter + if not f or f(args): + convert = self.convert + if convert: + args = convert(args) + self.callback_result.append(args) + + def stop(self): + super(AsyncBindQueue, self).stop() + if self.bound_uid: + self.bound_obj.unbind_uid(self.bound_name, self.bound_uid) + self.bound_uid = 0 + self.bound_obj = None + + +def async_bind(self, bound_name, current=True, **kwargs): + '''A convenience method that returns a :class:`AsyncBindQueue` instance + initialized with the function parameters. :class:`AsyncBindQueue` + can also be instantiated directly. + .. versionadded:: 1.10.1 + ''' + return AsyncBindQueue( + bound_obj=self, bound_name=bound_name, current=current, **kwargs) diff --git a/piker/ui/pager.py b/piker/ui/pager.py index 7900f60a..ca17935b 100644 --- a/piker/ui/pager.py +++ b/piker/ui/pager.py @@ -10,6 +10,8 @@ from kivy.uix.textinput import TextInput from kivy.uix.scrollview import ScrollView from ..log import get_logger +from .kivy.utils_async import async_bind + log = get_logger('keyboard') @@ -33,7 +35,7 @@ async def handle_input( last_patt = [] kb = Window.request_keyboard(_kb_closed, widget, 'text') - keyq = kb.async_bind('on_key_down') + keyq = async_bind(kb, 'on_key_down') while True: async for kb, keycode, text, modifiers in keyq: @@ -79,7 +81,7 @@ async def handle_input( log.debug("Restarting keyboard handling loop") # rebind to avoid capturing keys processed by ^ coro - keyq = kb.async_bind('on_key_down') + keyq = async_bind(kb, 'on_key_down') log.debug("Exitting keyboard handling loop") @@ -178,7 +180,7 @@ class SearchBar(TextInput): self.select_all() # wait for to close search bar - await self.async_bind('on_text_validate').__aiter__().__anext__() + await async_bind(self, 'on_text_validate').__aiter__().__anext__() log.debug(f"Seach text is {self.text}") log.debug("Closing search bar")