Add "group statuses" support to status bar
Allows for submitting a top level "group status" associated with a "group key" which eventually resolves once all sub-statuses associated with that group key (and thus top level status) complete and are also removed. Also add support for a "final message" for each status such that once the status clear callback is called a final msg is placed on the status bar that is then removed when the next status is set. It's all a questionable bunch of closures/callbacks but it worx.backup_asyncify_input_modes
parent
095850c3ae
commit
fb040339e5
|
@ -21,7 +21,8 @@ Qt main window singletons and stuff.
|
|||
import os
|
||||
import signal
|
||||
import time
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional, Union
|
||||
import uuid
|
||||
|
||||
from pyqtgraph import QtGui
|
||||
from PyQt5 import QtCore
|
||||
|
@ -42,23 +43,97 @@ class MultiStatus:
|
|||
def __init__(self, bar, statuses) -> None:
|
||||
self.bar = bar
|
||||
self.statuses = statuses
|
||||
# self._clear_last: Optional[Callable[..., None]] = None
|
||||
self._to_clear: set = set()
|
||||
self._status_groups: dict[str, (set, Callable)] = {}
|
||||
|
||||
def open_status(
|
||||
|
||||
self,
|
||||
msg: str,
|
||||
) -> Callable[..., None]:
|
||||
final_msg: Optional[str] = None,
|
||||
clear_on_next: bool = False,
|
||||
group_key: Optional[Union[bool, str]] = False,
|
||||
|
||||
) -> Union[Callable[..., None], str]:
|
||||
'''Add a status to the status bar and return a close callback which
|
||||
when called will remove the status ``msg``.
|
||||
|
||||
'''
|
||||
for msg in self._to_clear:
|
||||
try:
|
||||
self.statuses.remove(msg)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
self.statuses.append(msg)
|
||||
|
||||
def remove_msg() -> None:
|
||||
self.statuses.remove(msg)
|
||||
self.render()
|
||||
try:
|
||||
self.statuses.remove(msg)
|
||||
self.render()
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if final_msg is not None:
|
||||
self.statuses.append(final_msg)
|
||||
self.render()
|
||||
self._to_clear.add(final_msg)
|
||||
|
||||
|
||||
ret = remove_msg
|
||||
|
||||
# create a "status group" such that new `.open_status()`
|
||||
# calls can be made passing in the returned group key.
|
||||
# once all clear callbacks have been called from all statuses
|
||||
# in the group the final status msg to be removed will be the one
|
||||
# the one provided when `group_key=True`, this way you can
|
||||
# create a long living status that completes once all
|
||||
# sub-statuses have finished.
|
||||
if group_key is True:
|
||||
if clear_on_next:
|
||||
ValueError("Can't create group status and clear it on next?")
|
||||
|
||||
# generate a key for a new "status group"
|
||||
new_group_key = str(uuid.uuid4())
|
||||
|
||||
def pop_group_and_clear():
|
||||
|
||||
subs, final_clear = self._status_groups.pop(new_group_key)
|
||||
assert not subs
|
||||
return remove_msg()
|
||||
|
||||
self._status_groups[new_group_key] = (set(), pop_group_and_clear)
|
||||
ret = new_group_key
|
||||
|
||||
elif group_key:
|
||||
|
||||
def pop_from_group_and_maybe_clear_group():
|
||||
# remove the message for this sub-status
|
||||
remove_msg()
|
||||
|
||||
# check to see if all other substatuses have cleared
|
||||
group_tup = self._status_groups.get(group_key)
|
||||
|
||||
if group_tup:
|
||||
subs, group_clear = group_tup
|
||||
try:
|
||||
subs.remove(msg)
|
||||
except KeyError:
|
||||
raise KeyError(f'no msg {msg} for group {group_key}!?')
|
||||
|
||||
if not subs:
|
||||
group_clear()
|
||||
|
||||
self._status_groups[group_key][0].add(msg)
|
||||
ret = pop_from_group_and_maybe_clear_group
|
||||
|
||||
if clear_on_next:
|
||||
self._to_clear.add(msg)
|
||||
|
||||
self.render()
|
||||
return remove_msg
|
||||
|
||||
return ret
|
||||
|
||||
def render(self) -> None:
|
||||
if self.statuses:
|
||||
|
|
Loading…
Reference in New Issue