Use new PCN format for settings config file

This commit is contained in:
miruka
2020-10-07 20:12:32 -04:00
parent 6ce3059322
commit db12036372
52 changed files with 1305 additions and 409 deletions

View File

@@ -109,7 +109,7 @@ class Backend:
self.settings = Settings(self)
self.ui_state = UIState(self)
self.history = History(self)
self.theme = Theme(self, self.settings["theme"])
self.theme = Theme(self, self.settings.General.theme)
self.clients: Dict[str, MatrixClient] = {}
@@ -427,13 +427,13 @@ class Backend:
return path
async def get_settings(self) -> Tuple[Settings, UIState, History, str]:
async def get_settings(self) -> Tuple[dict, UIState, History, str]:
"""Return parsed user config files for QML."""
return (
self.settings.data,
self.ui_state.data,
self.history.data,
self.theme.data,
self.settings.qml_data,
self.ui_state.qml_data,
self.history.qml_data,
self.theme.qml_data,
)

View File

@@ -493,7 +493,7 @@ class MatrixClient(nio.AsyncClient):
utils.dict_update_recursive(first, self.low_limit_filter)
if self.backend.settings["hideUnknownEvents"]:
if not self.backend.settings.Chat.show_unknown_events:
first["room"]["timeline"]["not_types"].extend(
self.no_unknown_events_filter
["room"]["timeline"]["not_types"],
@@ -1146,14 +1146,22 @@ class MatrixClient(nio.AsyncClient):
room = self.models[self.user_id, "rooms"][room_id]
room.bookmarked = not room.bookmarked
settings = self.backend.ui_settings
bookmarks = settings["roomBookmarkIDs"].setdefault(self.user_id, [])
if room.bookmarked and room_id not in bookmarks:
bookmarks.append(room_id)
elif not room.bookmarked and room_id in bookmarks:
bookmarks.remove(room_id)
settings = self.backend.settings
bookmarks = settings.RoomList.bookmarks
user_bookmarks = bookmarks.setdefault(self.user_id, [])
await self.backend.ui_settings.write(self.backend.ui_settings._data)
if room.bookmarked and room_id not in user_bookmarks:
user_bookmarks.append(room_id)
while not room.bookmarked and room_id in user_bookmarks:
user_bookmarks.remove(room_id)
# Changes inside dicts/lists aren't monitored, need to reassign
settings.RoomList.bookmarks = {
**bookmarks, self.user_id: user_bookmarks,
}
self.backend.settings.save()
async def room_forget(self, room_id: str) -> None:
"""Leave a joined room (or decline an invite) and forget its history.
@@ -1866,7 +1874,7 @@ class MatrixClient(nio.AsyncClient):
)
unverified_devices = registered.unverified_devices
bookmarks = self.backend.ui_settings["roomBookmarkIDs"]
bookmarks = self.backend.settings.RoomList.bookmarks
room_item = Room(
id = room.room_id,
for_account = self.user_id,
@@ -1912,7 +1920,7 @@ class MatrixClient(nio.AsyncClient):
local_unreads = local_unreads,
local_highlights = local_highlights,
lexical_sorting = self.backend.settings["lexicalRoomSorting"],
lexical_sorting = self.backend.settings.RoomList.lexical_sort,
bookmarked = room.room_id in bookmarks.get(self.user_id, {}),
)

View File

@@ -498,7 +498,7 @@ class NioCallbacks:
# Membership changes
if not prev or membership != prev_membership:
if self.client.backend.settings["hideMembershipEvents"]:
if not self.client.backend.settings.Chat.show_membership_events:
return None
reason = escape(
@@ -564,7 +564,7 @@ class NioCallbacks:
avatar_url = now.get("avatar_url") or "",
)
if self.client.backend.settings["hideProfileChangeEvents"]:
if not self.client.backend.settings.Chat.show_profile_changes:
return None
return (
@@ -682,7 +682,7 @@ class NioCallbacks:
async def onUnknownEvent(
self, room: nio.MatrixRoom, ev: nio.UnknownEvent,
) -> None:
if self.client.backend.settings["hideUnknownEvents"]:
if not self.client.backend.settings.Chat.show_unknown_events:
await self.client.register_nio_room(room)
return

View File

@@ -0,0 +1,4 @@
# Copyright Mirage authors & contributors <https://github.com/mirukana/mirage>
# SPDX-License-Identifier: LGPL-3.0-or-later
"""Parse and operate on PCN (Python Config Notation) files."""

View File

@@ -0,0 +1,37 @@
from collections import UserDict
from typing import TYPE_CHECKING, Any, Dict, Iterator
if TYPE_CHECKING:
from .section import Section
PCN_GLOBALS: Dict[str, Any] = {}
class GlobalsDict(UserDict):
def __init__(self, section: "Section") -> None:
super().__init__()
self.section = section
@property
def full_dict(self) -> Dict[str, Any]:
return {
**PCN_GLOBALS,
**(self.section.root if self.section.root else {}),
**(self.section.root.globals if self.section.root else {}),
"self": self.section,
"parent": self.section.parent,
"root": self.section.parent,
**self.data,
}
def __getitem__(self, key: str) -> Any:
return self.full_dict[key]
def __iter__(self) -> Iterator[str]:
return iter(self.full_dict)
def __len__(self) -> int:
return len(self.full_dict)
def __repr__(self) -> str:
return repr(self.full_dict)

View File

@@ -0,0 +1,52 @@
import re
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Callable, Dict, Type
if TYPE_CHECKING:
from .section import Section
TYPE_PROCESSORS: Dict[str, Callable[[Any], Any]] = {
"tuple": lambda v: tuple(v),
"set": lambda v: set(v),
}
class Unset:
pass
@dataclass
class Property:
name: str = field()
annotation: str = field()
expression: str = field()
section: "Section" = field()
value_override: Any = Unset
def __get__(self, obj: "Section", objtype: Type["Section"]) -> Any:
if not obj:
return self
if self.value_override is not Unset:
return self.value_override
env = obj.globals
result = eval(self.expression, dict(env), env) # nosec
return process_value(self.annotation, result)
def __set__(self, obj: "Section", value: Any) -> None:
self.value_override = value
obj._edited[self.name] = value
def process_value(annotation: str, value: Any) -> Any:
annotation = re.sub(r"\[.*\]$", "", annotation)
if annotation in TYPE_PROCESSORS:
return TYPE_PROCESSORS[annotation](value)
if annotation.lower() in TYPE_PROCESSORS:
return TYPE_PROCESSORS[annotation.lower()](value)
return value

395
src/backend/pcn/section.py Normal file
View File

@@ -0,0 +1,395 @@
import textwrap
from collections import OrderedDict
from collections.abc import MutableMapping
from dataclasses import dataclass, field
from operator import attrgetter
from pathlib import Path
from typing import (
Any, Callable, ClassVar, Dict, Generator, List, Optional, Set, Tuple, Type,
Union,
)
import redbaron as red
from .globals_dict import GlobalsDict
from .property import Property, process_value
# TODO: docstrings, error handling, support @property, non-section classes
BUILTINS_DIR: Path = Path(__file__).parent.parent.parent.resolve() # src dir
@dataclass(repr=False, eq=False)
class Section(MutableMapping):
sections: ClassVar[Set[str]] = set()
methods: ClassVar[Set[str]] = set()
properties: ClassVar[Set[str]] = set()
order: ClassVar[Dict[str, None]] = OrderedDict()
source_path: Optional[Path] = None
root: Optional["Section"] = None
parent: Optional["Section"] = None
builtins_path: Path = BUILTINS_DIR
globals: GlobalsDict = field(init=False)
_edited: Dict[str, Any] = field(init=False, default_factory=dict)
def __init_subclass__(cls, **kwargs) -> None:
# Make these attributes not shared between Section and its subclasses
cls.sections = set()
cls.methods = set()
cls.properties = set()
cls.order = OrderedDict()
for parent_class in cls.__bases__:
if not issubclass(parent_class, Section):
continue
cls.sections |= parent_class.sections # union operator
cls.methods |= parent_class.methods
cls.properties |= parent_class.properties
cls.order.update(parent_class.order)
super().__init_subclass__(**kwargs) # type: ignore
def __post_init__(self) -> None:
self.globals = GlobalsDict(self)
def __getattr__(self, name: str) -> Union["Section", Any]:
# This method signature tells mypy about the dynamic attribute types
# we can access. The body is run for attributes that aren't found.
return super().__getattribute__(name)
def __setattr__(self, name: str, value: Any) -> None:
# This method tells mypy about the dynamic attribute types we can set.
# The body is also run when setting an existing or new attribute.
if name in self.__dataclass_fields__:
super().__setattr__(name, value)
return
if name in self.properties:
value = process_value(getattr(type(self), name).annotation, value)
if self[name] == value:
return
getattr(type(self), name).value_override = value
self._edited[name] = value
return
if name in self.properties:
return
if name in self.sections:
raise NotImplementedError(f"cannot overwrite section {name!r}")
if name in self.methods:
raise NotImplementedError(f"cannot overwrite method {name!r}")
raise NotImplementedError(f"cannot add new attribute {name!r}")
def __delattr__(self, name: str) -> None:
raise NotImplementedError(f"cannot delete existing attribute {name!r}")
def __getitem__(self, key: str) -> Any:
try:
return getattr(self, key)
except AttributeError as err:
raise KeyError(str(err))
def __setitem__(self, key: str, value: Union["Section", str]) -> None:
setattr(self, key, value)
def __delitem__(self, key: str) -> None:
delattr(self, key)
def __iter__(self) -> Generator[str, None, None]:
for attr_name in self.order:
yield attr_name
def __len__(self) -> int:
return len(self.order)
def __eq__(self, obj: Any) -> bool:
if not isinstance(obj, Section):
return False
if self.globals.data != obj.globals.data or self.order != obj.order:
return False
return not any(self[attr] != obj[attr] for attr in self.order)
def __repr__(self) -> str:
name: str = type(self).__name__
children: List[str] = []
content: str = ""
newline: bool = False
for attr_name in self.order:
value = getattr(self, attr_name)
if attr_name in self.sections:
before = "\n" if children else ""
newline = True
try:
children.append(f"{before}{value!r},")
except RecursionError as err:
name = type(value).__name__
children.append(f"{before}{name}(\n {err!r}\n),")
pass
elif attr_name in self.methods:
before = "\n" if children else ""
newline = True
children.append(f"{before}def {value.__name__}(…),")
elif attr_name in self.properties:
before = "\n" if newline else ""
newline = False
try:
children.append(f"{before}{attr_name} = {value!r},")
except RecursionError as err:
children.append(f"{before}{attr_name} = {err!r},")
else:
newline = False
if children:
content = "\n%s\n" % textwrap.indent("\n".join(children), " " * 4)
return f"{name}({content})"
@classmethod
def _register_set_attr(cls, name: str, add_to_set_name: str) -> None:
cls.methods.discard(name)
cls.properties.discard(name)
cls.sections.discard(name)
getattr(cls, add_to_set_name).add(name)
cls.order[name] = None
for subclass in cls.__subclasses__():
subclass._register_set_attr(name, add_to_set_name)
def _set_section(self, section: "Section") -> None:
name = type(section).__name__
if hasattr(self, name) and name not in self.order:
raise AttributeError(f"{name!r}: forbidden name")
if name in self.sections:
self[name].deep_merge(section)
return
self._register_set_attr(name, "sections")
setattr(type(self), name, section)
def _set_method(self, name: str, method: Callable) -> None:
if hasattr(self, name) and name not in self.order:
raise AttributeError(f"{name!r}: forbidden name")
self._register_set_attr(name, "methods")
setattr(type(self), name, method)
def _set_property(
self, name: str, annotation: str, expression: str,
) -> None:
if hasattr(self, name) and name not in self.order:
raise AttributeError(f"{name!r}: forbidden name")
prop = Property(name, annotation, expression, self)
self._register_set_attr(name, "properties")
setattr(type(self), name, prop)
def deep_merge(self, section2: "Section") -> None:
for key in section2:
if key in self.sections and key in section2.sections:
self.globals.data.update(section2.globals.data)
self[key].deep_merge(section2[key])
elif key in section2.sections:
self.globals.data.update(section2.globals.data)
new_type = type(key, (Section,), {})
instance = new_type(
source_path = self.source_path,
root = self.root or self,
parent = self,
builtins_path = self.builtins_path,
)
self._set_section(instance)
instance.deep_merge(section2[key])
elif key in section2.methods:
self._set_method(key, section2[key])
else:
prop2 = getattr(type(section2), key)
self._set_property(key, prop2.annotation, prop2.expression)
def include_file(self, path: Union[Path, str]) -> None:
if not Path(path).is_absolute() and self.source_path:
path = self.source_path.parent / path
self.deep_merge(Section.from_file(path))
def include_builtin(self, relative_path: Union[Path, str]) -> None:
self.deep_merge(Section.from_file(self.builtins_path / relative_path))
def as_dict(self, _section: Optional["Section"] = None) -> Dict[str, Any]:
dct = {}
section = self if _section is None else _section
for key, value in section.items():
if isinstance(value, Section):
dct[key] = self.as_dict(value)
else:
dct[key] = value
return dct
def edits_as_dict(
self, _section: Optional["Section"] = None,
) -> Dict[str, Any]:
warning = (
"This file is generated when settings are changed from the GUI, "
"and properties in it override the ones in the corresponding "
"PCN user config file. "
"If a property is gets changed in the PCN file, any corresponding "
"property override here is removed."
)
if _section is None:
section = self
dct = {"__comment": warning, "set": section._edited.copy()}
add_to = dct["set"]
else:
section = _section
dct = {
prop_name: (
getattr(type(section), prop_name).expression,
value_override,
)
for prop_name, value_override in section._edited.items()
}
add_to = dct
for name in section.sections:
edits = section.edits_as_dict(section[name])
if edits:
add_to[name] = edits # type: ignore
return dct
def deep_merge_edits(
self, edits: Dict[str, Any], has_expressions: bool = True,
) -> None:
if not self.parent: # this is Root
edits = edits.get("set", {})
for name, value in edits.items():
if name not in self:
continue
if isinstance(self[name], Section) and isinstance(value, dict):
self[name].deep_merge_edits(value, has_expressions)
elif not has_expressions:
self[name] = value
elif isinstance(value, (tuple, list)):
user_expression, gui_value = value
if getattr(type(self), name).expression == user_expression:
self[name] = gui_value
@classmethod
def from_source_code(
cls,
code: str,
path: Optional[Path] = None,
builtins: Optional[Path] = None,
*,
inherit: Tuple[Type["Section"], ...] = (),
node: Union[None, red.RedBaron, red.ClassNode] = None,
name: str = "Root",
root: Optional["Section"] = None,
parent: Optional["Section"] = None,
) -> "Section":
builtins = builtins or BUILTINS_DIR
section: Type["Section"] = type(name, inherit or (Section,), {})
instance: Section = section(path, root, parent, builtins)
node = node or red.RedBaron(code)
for child in node.node_list:
if isinstance(child, red.ClassNode):
root_arg = instance if root is None else root
child_inherit = []
for name in child.inherit_from.dumps().split(","):
name = name.strip()
if root_arg is not None and name:
child_inherit.append(type(attrgetter(name)(root_arg)))
instance._set_section(section.from_source_code(
code = code,
path = path,
builtins = builtins,
inherit = tuple(child_inherit),
node = child,
name = child.name,
root = root_arg,
parent = instance,
))
elif isinstance(child, red.AssignmentNode):
instance._set_property(
child.target.value,
child.annotation.dumps() if child.annotation else "",
child.value.dumps(),
)
else:
env = instance.globals
exec(child.dumps(), dict(env), env) # nosec
if isinstance(child, red.DefNode):
instance._set_method(child.name, env[child.name])
return instance
@classmethod
def from_file(
cls, path: Union[str, Path], builtins: Union[str, Path] = BUILTINS_DIR,
) -> "Section":
path = Path(path)
return Section.from_source_code(path.read_text(), path, Path(builtins))

View File

@@ -6,20 +6,23 @@
import asyncio
import json
import os
import platform
import traceback
from collections.abc import MutableMapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Iterator, Optional, Tuple
from typing import (
TYPE_CHECKING, Any, ClassVar, Dict, Iterator, Optional, Tuple,
)
import aiofiles
from watchgod import Change, awatch
import pyotherside
from .pyotherside_events import UserFileChanged
from .pcn.section import Section
from .pyotherside_events import LoopException, UserFileChanged
from .theme_parser import convert_to_qml
from .utils import atomic_write, dict_update_recursive
from .utils import atomic_write, deep_serialize_for_qml, dict_update_recursive
if TYPE_CHECKING:
from .backend import Backend
@@ -34,20 +37,21 @@ class UserFile:
backend: "Backend" = field(repr=False)
filename: str = field()
data: Any = field(init=False, default_factory=dict)
_need_write: bool = field(init=False, default=False)
_wrote: bool = field(init=False, default=False)
data: Any = field(init=False, default_factory=dict)
_need_write: bool = field(init=False, default=False)
_mtime: Optional[float] = field(init=False, default=None)
_reader: Optional[asyncio.Future] = field(init=False, default=None)
_writer: Optional[asyncio.Future] = field(init=False, default=None)
def __post_init__(self) -> None:
try:
self.data, save = self.deserialized(self.path.read_text())
text_data = self.path.read_text()
except FileNotFoundError:
self.data = self.default_data
self._need_write = self.create_missing
else:
self.data, save = self.deserialized(text_data)
if save:
self.save()
@@ -56,14 +60,24 @@ class UserFile:
@property
def path(self) -> Path:
"""Full path of the file, can exist or not exist."""
"""Full path of the file to read, can exist or not exist."""
raise NotImplementedError()
@property
def write_path(self) -> Path:
"""Full path of the file to write, can exist or not exist."""
return self.path
@property
def default_data(self) -> Any:
"""Default deserialized content to use if the file doesn't exist."""
raise NotImplementedError()
@property
def qml_data(self) -> Any:
"""Data converted for usage in QML."""
return self.data
def deserialized(self, data: str) -> Tuple[Any, bool]:
"""Return parsed data from file text and whether to call `save()`."""
return (data, False)
@@ -76,6 +90,15 @@ class UserFile:
"""Inform the disk writer coroutine that the data has changed."""
self._need_write = True
def stop_watching(self) -> None:
"""Stop watching the on-disk file for changes."""
if self._reader:
self._reader.cancel()
if self._writer:
self._writer.cancel()
async def set_data(self, data: Any) -> None:
"""Set `data` and call `save()`, conveniance method for QML."""
self.data = data
@@ -88,45 +111,57 @@ class UserFile:
await asyncio.sleep(1)
async for changes in awatch(self.path):
ignored = 0
try:
ignored = 0
for change in changes:
if change[0] in (Change.added, Change.modified):
if self._need_write or self._wrote:
self._wrote = False
ignored += 1
continue
for change in changes:
mtime = self.path.stat().st_mtime
async with aiofiles.open(self.path) as file:
self.data, save = self.deserialized(await file.read())
if change[0] in (Change.added, Change.modified):
if mtime == self._mtime:
ignored += 1
continue
if save:
self.save()
async with aiofiles.open(self.path) as file:
text = await file.read()
self.data, save = self.deserialized(text)
elif change[0] == Change.deleted:
self._wrote = False
self.data = self.default_data
self._need_write = self.create_missing
if save:
self.save()
if changes and ignored < len(changes):
UserFileChanged(type(self), self.data)
elif change[0] == Change.deleted:
self.data = self.default_data
self._need_write = self.create_missing
self._mtime = mtime
if changes and ignored < len(changes):
UserFileChanged(type(self), self.qml_data)
except Exception as err:
LoopException(str(err), err, traceback.format_exc().rstrip())
async def _start_writer(self) -> None:
"""Disk writer coroutine, update the file with a 1 second cooldown."""
self.path.parent.mkdir(parents=True, exist_ok=True)
self.write_path.parent.mkdir(parents=True, exist_ok=True)
while True:
await asyncio.sleep(1)
if self._need_write:
async with atomic_write(self.path) as (new, done):
await new.write(self.serialized())
done()
try:
if self._need_write:
async with atomic_write(self.write_path) as (new, done):
await new.write(self.serialized())
done()
self._need_write = False
self._mtime = self.write_path.stat().st_mtime
except Exception as err:
self._need_write = False
self._wrote = True
LoopException(str(err), err, traceback.format_exc().rstrip())
@dataclass
@@ -156,6 +191,7 @@ class UserDataFile(UserFile):
@dataclass
class MappingFile(MutableMapping, UserFile):
"""A file manipulable like a dict. `data` must be a mutable mapping."""
def __getitem__(self, key: Any) -> Any:
return self.data[key]
@@ -171,6 +207,22 @@ class MappingFile(MutableMapping, UserFile):
def __len__(self) -> int:
return len(self.data)
def __getattr__(self, key: Any) -> Any:
try:
return self.data[key]
except KeyError:
return super().__getattribute__(key)
def __setattr__(self, key: Any, value: Any) -> None:
if key in self.__dataclass_fields__:
super().__setattr__(key, value)
return
self.data[key] = value
def __delattr__(self, key: Any) -> None:
del self.data[key]
@dataclass
class JSONFile(MappingFile):
@@ -180,7 +232,6 @@ class JSONFile(MappingFile):
def default_data(self) -> dict:
return {}
def deserialized(self, data: str) -> Tuple[dict, bool]:
"""Return parsed data from file text and whether to call `save()`.
@@ -193,12 +244,41 @@ class JSONFile(MappingFile):
dict_update_recursive(all_data, loaded)
return (all_data, loaded != all_data)
def serialized(self) -> str:
data = self.data
return json.dumps(data, indent=4, ensure_ascii=False, sort_keys=True)
@dataclass
class PCNFile(MappingFile):
"""File stored in the PCN format, with machine edits in a separate JSON."""
create_missing = False
@property
def write_path(self) -> Path:
"""Full path of file where programatically-done edits are stored."""
return self.path.with_suffix(".gui.json")
@property
def qml_data(self) -> Dict[str, Any]:
return deep_serialize_for_qml(self.data.as_dict()) # type: ignore
def deserialized(self, data: str) -> Tuple[Section, bool]:
root = Section.from_source_code(data, self.path)
edits = self.write_path.read_text() if self.write_path.exists() else ""
root.deep_merge_edits(json.loads(edits))
return (root, False)
def serialized(self) -> str:
edits = self.data.edits_as_dict()
return json.dumps(edits, indent=4, ensure_ascii=False)
async def set_data(self, data: Dict[str, Any]) -> None:
self.data.deep_merge_edits({"set": data}, has_expressions=False)
self.save()
@dataclass
class Accounts(ConfigFile, JSONFile):
"""Config file for saved matrix accounts: user ID, access tokens, etc"""
@@ -209,7 +289,6 @@ class Accounts(ConfigFile, JSONFile):
"""Return for QML whether there are any accounts saved on disk."""
return bool(self.data)
async def add(self, user_id: str) -> None:
"""Add an account to the config and write it on disk.
@@ -233,7 +312,6 @@ class Accounts(ConfigFile, JSONFile):
})
self.save()
async def set(
self,
user_id: str,
@@ -261,7 +339,6 @@ class Accounts(ConfigFile, JSONFile):
self.save()
async def forget(self, user_id: str) -> None:
"""Delete an account from the config and write it on disk."""
@@ -270,187 +347,33 @@ class Accounts(ConfigFile, JSONFile):
@dataclass
class Settings(ConfigFile, JSONFile):
class Settings(ConfigFile, PCNFile):
"""General config file for UI and backend settings"""
filename: str = "settings.json"
filename: str = "settings.py"
@property
def default_data(self) -> dict:
def ctrl_or_osx_ctrl() -> str:
# Meta in Qt corresponds to Ctrl on OSX
return "Meta" if platform.system() == "Darwin" else "Ctrl"
def default_data(self) -> Section:
root = Section.from_file("src/config/settings.py")
edits = "{}"
def alt_or_cmd() -> str:
# Ctrl in Qt corresponds to Cmd on OSX
return "Ctrl" if platform.system() == "Darwin" else "Alt"
if self.write_path.exists():
edits = self.write_path.read_text()
return {
"alertOnMentionForMsec": -1,
"alertOnMessageForMsec": 0,
"alwaysCenterRoomHeader": False,
# "autoHideScrollBarsAfterMsec": 2000,
"beUnavailableAfterSecondsIdle": 60 * 10,
"centerRoomListOnClick": False,
"compactMode": False,
"clearRoomFilterOnEnter": True,
"clearRoomFilterOnEscape": True,
"clearMemberFilterOnEscape": True,
"closeMinimizesToTray": False,
"collapseSidePanesUnderWindowWidth": 450,
"enableKineticScrolling": True,
"hideProfileChangeEvents": True,
"hideMembershipEvents": False,
"hideUnknownEvents": True,
"kineticScrollingMaxSpeed": 2500,
"kineticScrollingDeceleration": 1500,
"lexicalRoomSorting": False,
"markRoomReadMsecDelay": 200,
"maxMessageCharactersPerLine": 65,
"nonKineticScrollingSpeed": 1.0,
"ownMessagesOnLeftAboveWidth": 895,
"theme": "Midnight.qpl",
"writeAliases": {},
"zoom": 1.0,
"roomBookmarkIDs": {},
root.deep_merge_edits(json.loads(edits))
return root
"media": {
"autoLoad": True,
"autoPlay": False,
"autoPlayGIF": True,
"autoHideOSDAfterMsec": 3000,
"defaultVolume": 100,
"openExternallyOnClick": False,
"startMuted": False,
},
"keys": {
"startPythonDebugger": ["Alt+Shift+D"],
"toggleDebugConsole": ["Alt+Shift+C", "F1"],
def deserialized(self, data: str) -> Tuple[Section, bool]:
section, save = super().deserialized(data)
"zoomIn": ["Ctrl++"],
"zoomOut": ["Ctrl+-"],
"zoomReset": ["Ctrl+="],
"toggleCompactMode": ["Ctrl+Alt+C"],
"toggleHideRoomPane": ["Ctrl+Alt+R"],
"scrollUp": ["Alt+Up", "Alt+K"],
"scrollDown": ["Alt+Down", "Alt+J"],
"scrollPageUp": ["Alt+Ctrl+Up", "Alt+Ctrl+K", "PgUp"],
"scrollPageDown": ["Alt+Ctrl+Down", "Alt+Ctrl+J", "PgDown"],
"scrollToTop":
["Alt+Ctrl+Shift+Up", "Alt+Ctrl+Shift+K", "Home"],
"scrollToBottom":
["Alt+Ctrl+Shift+Down", "Alt+Ctrl+Shift+J", "End"],
"previousTab": ["Alt+Shift+Left", "Alt+Shift+H"],
"nextTab": ["Alt+Shift+Right", "Alt+Shift+L"],
"addNewAccount": ["Alt+Shift+A"],
"accountSettings": ["Alt+A"],
"addNewChat": ["Alt+C"],
"toggleFocusMainPane": ["Alt+F"],
"clearRoomFilter": ["Alt+Shift+F"],
"toggleCollapseAccount": ["Alt+O"],
"openPresenceMenu": ["Alt+P"],
"togglePresenceUnavailable": ["Alt+Ctrl+U", "Alt+Ctrl+A"],
"togglePresenceInvisible": ["Alt+Ctrl+I"],
"togglePresenceOffline": ["Alt+Ctrl+O"],
"goToLastPage": ["Ctrl+Tab"],
"goToPreviousAccount": ["Alt+Shift+N"],
"goToNextAccount": ["Alt+N"],
"goToPreviousRoom": ["Alt+Shift+Up", "Alt+Shift+K"],
"goToNextRoom": ["Alt+Shift+Down", "Alt+Shift+J"],
"goToPreviousUnreadRoom": ["Alt+Shift+U"],
"goToNextUnreadRoom": ["Alt+U"],
"goToPreviousMentionedRoom": ["Alt+Shift+M"],
"goToNextMentionedRoom": ["Alt+M"],
"focusAccountAtIndex": {
"01": f"{ctrl_or_osx_ctrl()}+1",
"02": f"{ctrl_or_osx_ctrl()}+2",
"03": f"{ctrl_or_osx_ctrl()}+3",
"04": f"{ctrl_or_osx_ctrl()}+4",
"05": f"{ctrl_or_osx_ctrl()}+5",
"06": f"{ctrl_or_osx_ctrl()}+6",
"07": f"{ctrl_or_osx_ctrl()}+7",
"08": f"{ctrl_or_osx_ctrl()}+8",
"09": f"{ctrl_or_osx_ctrl()}+9",
"10": f"{ctrl_or_osx_ctrl()}+0",
},
# On OSX, alt+numbers if used for symbols, use cmd instead
"focusRoomAtIndex": {
"01": f"{alt_or_cmd()}+1",
"02": f"{alt_or_cmd()}+2",
"03": f"{alt_or_cmd()}+3",
"04": f"{alt_or_cmd()}+4",
"05": f"{alt_or_cmd()}+5",
"06": f"{alt_or_cmd()}+6",
"07": f"{alt_or_cmd()}+7",
"08": f"{alt_or_cmd()}+8",
"09": f"{alt_or_cmd()}+9",
"10": f"{alt_or_cmd()}+0",
},
"unfocusOrDeselectAllMessages": ["Ctrl+D"],
"focusPreviousMessage": ["Ctrl+Up", "Ctrl+K"],
"focusNextMessage": ["Ctrl+Down", "Ctrl+J"],
"toggleSelectMessage": ["Ctrl+Space"],
"selectMessagesUntilHere": ["Ctrl+Shift+Space"],
"removeFocusedOrSelectedMessages": ["Ctrl+R", "Alt+Del"],
"replyToFocusedOrLastMessage": ["Ctrl+Q"], # Q → Quote
"debugFocusedMessage": ["Ctrl+Shift+D"],
"openMessagesLinksOrFiles": ["Ctrl+O"],
"openMessagesLinksOrFilesExternally": ["Ctrl+Shift+O"],
"copyFilesLocalPath": ["Ctrl+Shift+C"],
"clearRoomMessages": ["Ctrl+L"],
"sendFile": ["Alt+S"],
"sendFileFromPathInClipboard": ["Alt+Shift+S"],
"inviteToRoom": ["Alt+I"],
"leaveRoom": ["Alt+Escape"],
"forgetRoom": ["Alt+Shift+Escape"],
"toggleFocusRoomPane": ["Alt+R"],
"refreshDevices": ["Alt+R", "F5"],
"signOutCheckedOrAllDevices": ["Alt+S", "Delete"],
"imageViewer": {
"panLeft": ["H", "Left", "Alt+H", "Alt+Left"],
"panDown": ["J", "Down", "Alt+J", "Alt+Down"],
"panUp": ["K", "Up", "Alt+K", "Alt+Up"],
"panRight": ["L", "Right", "Alt+L", "Alt+Right"],
"zoomReset": ["Alt+Z", "=", "Ctrl+="],
"zoomOut": ["Shift+Z", "-", "Ctrl+-"],
"zoomIn": ["Z", "+", "Ctrl++"],
"rotateReset": ["Alt+R"],
"rotateLeft": ["Shift+R"],
"rotateRight": ["R"],
"resetSpeed": ["Alt+S"],
"previousSpeed": ["Shift+S"],
"nextSpeed": ["S"],
"pause": ["Space"],
"expand": ["E"],
"fullScreen": ["F", "F11", "Alt+Return", "Alt+Enter"],
"close": ["X", "Q"],
},
},
}
def deserialized(self, data: str) -> Tuple[dict, bool]:
dict_data, save = super().deserialized(data)
if "theme" in self and self["theme"] != dict_data["theme"]:
self.backend.theme = Theme(self.backend, dict_data["theme"])
if self and self.General.theme != section.General.theme:
self.backend.theme.stop_watching()
self.backend.theme = Theme(
self.backend, section.General.theme, # type: ignore
)
UserFileChanged(Theme, self.backend.theme.data)
return (dict_data, save)
return (section, save)
@dataclass

View File

@@ -12,14 +12,16 @@ import json
import sys
import xml.etree.cElementTree as xml_etree # FIXME: bandit warning
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime, timedelta
from contextlib import suppress
from datetime import date, datetime, time, timedelta
from enum import Enum
from enum import auto as autostr
from pathlib import Path
from tempfile import NamedTemporaryFile
from types import ModuleType
from typing import (
Any, AsyncIterator, Callable, Dict, Mapping, Sequence, Tuple, Type, Union,
Any, AsyncIterator, Callable, Dict, Iterable, Mapping, Sequence, Tuple,
Type, Union,
)
from uuid import UUID
@@ -27,10 +29,9 @@ import aiofiles
import filetype
from aiofiles.threadpool.binary import AsyncBufferedReader
from aiofiles.threadpool.text import AsyncTextIOWrapper
from PIL import Image as PILImage
from nio.crypto import AsyncDataT as File
from nio.crypto import async_generator_from_data
from PIL import Image as PILImage
if sys.version_info >= (3, 7):
from contextlib import asynccontextmanager
@@ -145,14 +146,17 @@ def plain2html(text: str) -> str:
.replace("\t", "&nbsp;" * 4)
def serialize_value_for_qml(value: Any, json_list_dicts: bool = False) -> Any:
def serialize_value_for_qml(
value: Any, json_list_dicts: bool = False, reject_unknown: bool = False,
) -> Any:
"""Convert a value to make it easier to use from QML.
Returns:
- For `int`, `float`, `bool`, `str` and `datetime`: the unchanged value
- For `bool`, `int`, `float`, `bytes`, `str`, `datetime`, `date`, `time`:
the unchanged value (PyOtherSide handles these)
- For `Sequence` and `Mapping` subclasses (includes `list` and `dict`):
- For `Iterable` objects (includes `list` and `dict`):
a JSON dump if `json_list_dicts` is `True`, else the unchanged value
- If the value is an instancied object and has a `serialized` attribute or
@@ -168,10 +172,11 @@ def serialize_value_for_qml(value: Any, json_list_dicts: bool = False) -> Any:
- For class types: the class `__name__`
- For anything else: the unchanged value
- For anything else: raise a `TypeError` if `reject_unknown` is `True`,
else return the unchanged value.
"""
if isinstance(value, (int, float, bool, str, datetime)):
if isinstance(value, (bool, int, float, bytes, str, datetime, date, time)):
return value
if json_list_dicts and isinstance(value, (Sequence, Mapping)):
@@ -180,6 +185,9 @@ def serialize_value_for_qml(value: Any, json_list_dicts: bool = False) -> Any:
if not inspect.isclass(value) and hasattr(value, "serialized"):
return value.serialized
if isinstance(value, Iterable):
return value
if hasattr(value, "__class__") and issubclass(value.__class__, Enum):
return value.value
@@ -195,9 +203,43 @@ def serialize_value_for_qml(value: Any, json_list_dicts: bool = False) -> Any:
if inspect.isclass(value):
return value.__name__
if reject_unknown:
raise TypeError("Unknown type reject")
return value
def deep_serialize_for_qml(obj: Iterable) -> Union[list, dict]:
"""Recursively serialize lists and dict values for QML."""
if isinstance(obj, Mapping):
dct = {}
for key, value in obj.items():
if isinstance(value, Iterable) and not isinstance(value, str):
# PyOtherSide only accept dicts with string keys
dct[str(key)] = deep_serialize_for_qml(value)
continue
with suppress(TypeError):
dct[str(key)] = \
serialize_value_for_qml(value, reject_unknown=True)
return dct
lst = []
for value in obj:
if isinstance(value, Iterable) and not isinstance(value, str):
lst.append(deep_serialize_for_qml(value))
continue
with suppress(TypeError):
lst.append(serialize_value_for_qml(value, reject_unknown=True))
return lst
def classes_defined_in(module: ModuleType) -> Dict[str, Type]:
"""Return a `{name: class}` dict of all the classes a module defines."""