444 lines
13 KiB
Python
444 lines
13 KiB
Python
import logging
|
|
import textwrap
|
|
from typing import (
|
|
Any, Callable, Dict, Iterable, List, Mapping, MutableSequence, Optional,
|
|
Sequence, Set, Tuple, Union
|
|
)
|
|
|
|
from PyQt5.QtCore import (
|
|
QAbstractListModel, QModelIndex, QObject, Qt, pyqtProperty, pyqtSignal,
|
|
pyqtSlot
|
|
)
|
|
|
|
from .list_item import ListItem
|
|
|
|
Index = Union[int, str]
|
|
NewItem = Union[ListItem, Mapping[str, Any], Sequence]
|
|
|
|
|
|
class _GetFail:
|
|
pass
|
|
|
|
|
|
class _PopFail:
|
|
pass
|
|
|
|
|
|
class ListModel(QAbstractListModel):
|
|
rolesSet = pyqtSignal()
|
|
changed = pyqtSignal()
|
|
countChanged = pyqtSignal(int)
|
|
|
|
def __init__(self,
|
|
initial_data: Optional[List[NewItem]] = None,
|
|
container: Callable[..., MutableSequence] = list,
|
|
default_factory: Optional[Callable[[str], ListItem]] = None,
|
|
parent: QObject = None) -> None:
|
|
super().__init__(parent)
|
|
self._data: MutableSequence[ListItem] = container()
|
|
|
|
self.default_factory = default_factory
|
|
|
|
if initial_data:
|
|
self.extend(initial_data)
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
if not self._data:
|
|
return "\033[35m%s\033[0m()" % type(self).__name__
|
|
|
|
return "\033[35m%s\033[0m(\n%s\n)" % (
|
|
type(self).__name__,
|
|
textwrap.indent(
|
|
",\n".join((repr(item) for item in self._data)),
|
|
prefix = " " * 4,
|
|
)
|
|
)
|
|
|
|
|
|
def __contains__(self, index: Index) -> bool:
|
|
if isinstance(index, str):
|
|
try:
|
|
self.indexWhere(index)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
return index in self._data
|
|
|
|
|
|
def __getitem__(self, index: Index) -> ListItem:
|
|
return self.get(index)
|
|
|
|
|
|
def __setitem__(self, index: Index, value: NewItem) -> None:
|
|
self.set(index, value)
|
|
|
|
|
|
def __delitem__(self, index: Index) -> None:
|
|
self.remove(index)
|
|
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._data)
|
|
|
|
|
|
def __iter__(self) -> Iterable[NewItem]:
|
|
return iter(self._data)
|
|
|
|
|
|
def __bool__(self) -> bool:
|
|
return bool(self._data)
|
|
|
|
|
|
@pyqtSlot(result=str)
|
|
def repr(self) -> str:
|
|
return self.__repr__()
|
|
|
|
|
|
@pyqtProperty("QStringList", notify=rolesSet)
|
|
def roles(self) -> Tuple[str, ...]:
|
|
return self._data[0].roles if self._data else () # type: ignore
|
|
|
|
|
|
@pyqtProperty("QVariant", notify=rolesSet)
|
|
def mainKey(self) -> Optional[str]:
|
|
return self._data[0].mainKey if self._data else None
|
|
|
|
|
|
def roleNumbers(self) -> Dict[str, int]:
|
|
return {name: Qt.UserRole + i
|
|
for i, name in enumerate(self.roles, 1)} \
|
|
if self._data else {}
|
|
|
|
|
|
def roleNames(self) -> Dict[int, bytes]:
|
|
return {Qt.UserRole + i: bytes(name, "utf-8")
|
|
for i, name in enumerate(self.roles, 1)} \
|
|
if self._data else {}
|
|
|
|
|
|
def data(self, index: QModelIndex, role: int = Qt.DisplayRole) -> Any:
|
|
if role <= Qt.UserRole:
|
|
return None
|
|
|
|
return getattr(self._data[index.row()],
|
|
str(self.roleNames()[role], "utf8"))
|
|
|
|
|
|
def rowCount(self, _: QModelIndex = QModelIndex()) -> int:
|
|
return len(self)
|
|
|
|
|
|
def _convert_new_value(self, value: NewItem) -> ListItem:
|
|
def convert() -> ListItem:
|
|
if self._data and isinstance(value, Mapping):
|
|
if not set(value.keys()) <= set(self.roles):
|
|
raise ValueError(
|
|
f"{value}: must have all these keys: {self.roles}"
|
|
)
|
|
|
|
return type(self._data[0])(**value)
|
|
|
|
if not self._data and isinstance(value, Mapping):
|
|
raise NotImplementedError("First item must be set from Python")
|
|
|
|
if self._data and isinstance(value, type(self._data[0])):
|
|
return value
|
|
|
|
if not self._data and isinstance(value, ListItem):
|
|
return value
|
|
|
|
raise TypeError("%r: must be mapping or %s" % (
|
|
value,
|
|
type(self._data[0]).__name__ if self._data else "ListItem"
|
|
))
|
|
|
|
value = convert()
|
|
value.setParent(self)
|
|
return value
|
|
|
|
|
|
@pyqtProperty(int, notify=countChanged)
|
|
def count(self) -> int:
|
|
return len(self)
|
|
|
|
|
|
@pyqtSlot("QVariant", result=int)
|
|
def indexWhere(self,
|
|
main_key_is_value: Any,
|
|
_can_use_default_factory: bool = True) -> int:
|
|
|
|
for i, item in enumerate(self._data):
|
|
if getattr(item, self.mainKey) == main_key_is_value:
|
|
return i
|
|
|
|
if _can_use_default_factory and self.default_factory:
|
|
return self.append(self.default_factory(main_key_is_value))
|
|
|
|
raise ValueError(
|
|
f"No item in model data with "
|
|
f"property {self.mainKey} is set to {main_key_is_value!r}."
|
|
)
|
|
|
|
|
|
@pyqtSlot(int, result="QVariant")
|
|
@pyqtSlot(str, result="QVariant")
|
|
@pyqtSlot(int, "QVariant", result="QVariant")
|
|
@pyqtSlot(str, "QVariant", result="QVariant")
|
|
def get(self, index: Index, default: Any = _GetFail()) -> ListItem:
|
|
try:
|
|
i_index: int = \
|
|
self.indexWhere(index, _can_use_default_factory=False) \
|
|
if isinstance(index, str) else index
|
|
|
|
return self._data[i_index]
|
|
|
|
except (ValueError, IndexError):
|
|
if isinstance(default, _GetFail):
|
|
if self.default_factory and isinstance(index, str):
|
|
item = self.default_factory(index)
|
|
self.append(item)
|
|
return item
|
|
raise
|
|
|
|
return default
|
|
|
|
|
|
@pyqtSlot(int, "QVariantMap", result=int)
|
|
def insert(self, index: int, value: NewItem) -> int:
|
|
value = self._convert_new_value(value)
|
|
|
|
try:
|
|
present_index = self.indexWhere(
|
|
main_key_is_value = getattr(value, self.mainKey),
|
|
_can_use_default_factory = False
|
|
)
|
|
except (TypeError, ValueError): # TypeError = no items in model
|
|
pass
|
|
else:
|
|
logging.warning(
|
|
"Duplicate mainKey %r in model - present: %r, inserting: %r",
|
|
self.mainKey,
|
|
self[present_index],
|
|
value
|
|
)
|
|
|
|
self.beginInsertRows(QModelIndex(), index, index)
|
|
|
|
had_data = bool(self._data)
|
|
self._data.insert(index, value)
|
|
if not had_data:
|
|
self.rolesSet.emit()
|
|
|
|
self.endInsertRows()
|
|
|
|
self.countChanged.emit(len(self))
|
|
self.changed.emit()
|
|
return index
|
|
|
|
|
|
@pyqtSlot("QVariantMap", result=int)
|
|
def append(self, value: NewItem) -> int:
|
|
return self.insert(len(self), value)
|
|
|
|
|
|
@pyqtSlot(list)
|
|
def extend(self, values: Iterable[NewItem]) -> None:
|
|
for val in values:
|
|
self.append(val)
|
|
|
|
|
|
@pyqtSlot(list)
|
|
@pyqtSlot(list, bool)
|
|
def updateAll(self, items: Sequence[NewItem], delete: bool = False
|
|
) -> None:
|
|
items_: List[ListItem] = [self._convert_new_value(i) for i in items]
|
|
|
|
if delete:
|
|
present_item: ListItem
|
|
for i, present_item in enumerate(self):
|
|
present_item_key = getattr(present_item, self.mainKey)
|
|
|
|
# If this present item is in the update items, based on mainKey
|
|
for update_item in items_:
|
|
if present_item_key == getattr(update_item, self.mainKey):
|
|
break
|
|
else:
|
|
del self[i]
|
|
|
|
for item in items_:
|
|
self.upsert(
|
|
where_main_key_is = getattr(item, item.mainKey),
|
|
update_with = item
|
|
)
|
|
|
|
|
|
@pyqtSlot(int, "QVariantMap", result=int)
|
|
@pyqtSlot(int, "QVariantMap", "QStringList", result=int)
|
|
@pyqtSlot(str, "QVariantMap", result=int)
|
|
@pyqtSlot(str, "QVariantMap", "QStringList", result=int)
|
|
def updateItem(self,
|
|
index: Index,
|
|
value: NewItem,
|
|
no_update: Sequence[str] = ()) -> int:
|
|
value = self._convert_new_value(value)
|
|
|
|
i_index: int = self.indexWhere(index, _can_use_default_factory=False) \
|
|
if isinstance(index, str) else index
|
|
|
|
to_update = self[i_index]
|
|
|
|
updated_roles: Set[int] = set()
|
|
|
|
for role_name, role_num in self.roleNumbers().items():
|
|
if role_name not in no_update:
|
|
old_value = getattr(to_update, role_name)
|
|
new_value = getattr(value, role_name)
|
|
|
|
if old_value != new_value:
|
|
try:
|
|
setattr(to_update, role_name, new_value)
|
|
except AttributeError: # constant/not settable
|
|
pass
|
|
else:
|
|
updated_roles.add(role_num)
|
|
|
|
if updated_roles:
|
|
qidx = QAbstractListModel.index(self, i_index, 0)
|
|
self.dataChanged.emit(qidx, qidx, updated_roles)
|
|
self.changed.emit()
|
|
|
|
return i_index
|
|
|
|
|
|
@pyqtSlot(str, "QVariantMap")
|
|
@pyqtSlot(str, "QVariantMap", int)
|
|
@pyqtSlot(str, "QVariantMap", int, int)
|
|
@pyqtSlot(str, "QVariantMap", int, int, "QStringList")
|
|
def upsert(self,
|
|
where_main_key_is: Any,
|
|
update_with: NewItem,
|
|
new_index_if_insert: Optional[int] = None,
|
|
new_index_if_update: Optional[int] = None,
|
|
no_update: Sequence[str] = ()) -> None:
|
|
try:
|
|
index = self.updateItem(
|
|
where_main_key_is, update_with, no_update
|
|
)
|
|
except (IndexError, ValueError):
|
|
self.insert(new_index_if_insert or len(self), update_with)
|
|
else:
|
|
if new_index_if_update:
|
|
self.move(index, new_index_if_update)
|
|
|
|
|
|
@pyqtSlot(int, list)
|
|
@pyqtSlot(str, list)
|
|
def set(self, index: Index, value: NewItem) -> None:
|
|
i_index: int = self.indexWhere(index) \
|
|
if isinstance(index, str) else index
|
|
|
|
qidx = QAbstractListModel.index(self, i_index, 0)
|
|
value = self._convert_new_value(value)
|
|
self._data[i_index] = value
|
|
self.dataChanged.emit(qidx, qidx, self.roleNames())
|
|
self.changed.emit()
|
|
|
|
|
|
@pyqtSlot(int, str, "QVariant")
|
|
@pyqtSlot(str, str, "QVariant")
|
|
def setProperty(self, index: Index, prop: str, value: Any) -> None:
|
|
i_index: int = self.indexWhere(index) \
|
|
if isinstance(index, str) else index
|
|
|
|
if getattr(self[i_index], prop) != value:
|
|
setattr(self[i_index], prop, value)
|
|
qidx = QAbstractListModel.index(self, i_index, 0)
|
|
self.dataChanged.emit(qidx, qidx, (self.roleNumbers()[prop],))
|
|
self.changed.emit()
|
|
|
|
|
|
@pyqtSlot(int, int)
|
|
@pyqtSlot(int, int, int)
|
|
@pyqtSlot(str, int)
|
|
@pyqtSlot(str, int, int)
|
|
def move(self, from_: Index, to: int, n: int = 1) -> None:
|
|
# pylint: disable=invalid-name
|
|
i_from: int = self.indexWhere(from_) \
|
|
if isinstance(from_, str) else from_
|
|
|
|
qlast = i_from + n - 1
|
|
|
|
if (n <= 0) or (i_from == to) or (qlast == to) or \
|
|
not (len(self) > qlast >= 0) or \
|
|
not len(self) >= to >= 0:
|
|
return
|
|
|
|
qidx = QModelIndex()
|
|
qto = min(len(self), to + n if to > i_from else to)
|
|
# print(f"self.beginMoveRows(qidx, {i_from}, {qlast}, qidx, {qto})")
|
|
valid = self.beginMoveRows(qidx, i_from, qlast, qidx, qto)
|
|
|
|
if not valid:
|
|
logging.warning("Invalid move operation - %r", locals())
|
|
return
|
|
|
|
last = i_from + n
|
|
cut = self._data[i_from:last]
|
|
del self._data[i_from:last]
|
|
self._data[to:to] = cut
|
|
|
|
self.endMoveRows()
|
|
self.changed.emit()
|
|
|
|
|
|
@pyqtSlot(int)
|
|
@pyqtSlot(str)
|
|
def remove(self, index: Index) -> None:
|
|
i_index: int = self.indexWhere(index) \
|
|
if isinstance(index, str) else index
|
|
|
|
self.beginRemoveRows(QModelIndex(), i_index, i_index)
|
|
del self._data[i_index]
|
|
self.endRemoveRows()
|
|
|
|
self.countChanged.emit(len(self))
|
|
self.changed.emit()
|
|
|
|
|
|
@pyqtSlot(int, result="QVariant")
|
|
@pyqtSlot(str, result="QVariant")
|
|
def pop(self, index: Index, default: Any = _PopFail()) -> ListItem:
|
|
try:
|
|
i_index: int = self.indexWhere(index) \
|
|
if isinstance(index, str) else index
|
|
item = self[i_index]
|
|
|
|
except (ValueError, IndexError):
|
|
if isinstance(default, _PopFail):
|
|
raise
|
|
return default
|
|
|
|
self.beginRemoveRows(QModelIndex(), i_index, i_index)
|
|
del self._data[i_index]
|
|
self.endRemoveRows()
|
|
|
|
self.countChanged.emit(len(self))
|
|
self.changed.emit()
|
|
return item
|
|
|
|
|
|
@pyqtSlot()
|
|
def clear(self) -> None:
|
|
if not self._data:
|
|
return
|
|
|
|
# Reimplemented for performance reasons (begin/endRemoveRows)
|
|
self.beginRemoveRows(QModelIndex(), 0, len(self))
|
|
self._data.clear()
|
|
self.endRemoveRows()
|
|
|
|
self.countChanged.emit(len(self))
|
|
self.changed.emit()
|