Add "group statuses" support to status bar

Allows for submitting a top level "group status" associated with
a "group key" which eventually resolves once all sub-statuses associated
with that group key (and thus top level status) complete and are also
removed. Also add support for a "final message" for each status such
that once the status clear callback is called a final msg is placed on
the status bar that is then removed when the next status is set.

It's all a questionable bunch of closures/callbacks but it worx.
asyncify_input_modes
Tyler Goodlet 2021-06-17 16:53:19 -04:00
parent 53074b552a
commit 0133f0b589
1 changed files with 80 additions and 5 deletions

View File

@ -21,7 +21,8 @@ Qt main window singletons and stuff.
import os
import signal
import time
from typing import Callable
from typing import Callable, Optional, Union
import uuid
from pyqtgraph import QtGui
from PyQt5 import QtCore
@ -42,23 +43,97 @@ class MultiStatus:
def __init__(self, bar, statuses) -> None:
self.bar = bar
self.statuses = statuses
# self._clear_last: Optional[Callable[..., None]] = None
self._to_clear: set = set()
self._status_groups: dict[str, (set, Callable)] = {}
def open_status(
self,
msg: str,
) -> Callable[..., None]:
final_msg: Optional[str] = None,
clear_on_next: bool = False,
group_key: Optional[Union[bool, str]] = False,
) -> Union[Callable[..., None], str]:
'''Add a status to the status bar and return a close callback which
when called will remove the status ``msg``.
'''
for msg in self._to_clear:
try:
self.statuses.remove(msg)
except ValueError:
pass
self.statuses.append(msg)
def remove_msg() -> None:
self.statuses.remove(msg)
self.render()
try:
self.statuses.remove(msg)
self.render()
except ValueError:
pass
if final_msg is not None:
self.statuses.append(final_msg)
self.render()
self._to_clear.add(final_msg)
ret = remove_msg
# create a "status group" such that new `.open_status()`
# calls can be made passing in the returned group key.
# once all clear callbacks have been called from all statuses
# in the group the final status msg to be removed will be the one
# the one provided when `group_key=True`, this way you can
# create a long living status that completes once all
# sub-statuses have finished.
if group_key is True:
if clear_on_next:
ValueError("Can't create group status and clear it on next?")
# generate a key for a new "status group"
new_group_key = str(uuid.uuid4())
def pop_group_and_clear():
subs, final_clear = self._status_groups.pop(new_group_key)
assert not subs
return remove_msg()
self._status_groups[new_group_key] = (set(), pop_group_and_clear)
ret = new_group_key
elif group_key:
def pop_from_group_and_maybe_clear_group():
# remove the message for this sub-status
remove_msg()
# check to see if all other substatuses have cleared
group_tup = self._status_groups.get(group_key)
if group_tup:
subs, group_clear = group_tup
try:
subs.remove(msg)
except KeyError:
raise KeyError(f'no msg {msg} for group {group_key}!?')
if not subs:
group_clear()
self._status_groups[group_key][0].add(msg)
ret = pop_from_group_and_maybe_clear_group
if clear_on_next:
self._to_clear.add(msg)
self.render()
return remove_msg
return ret
def render(self) -> None:
if self.statuses: