339 lines
9.5 KiB
Python
339 lines
9.5 KiB
Python
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
|
|
"""User data and configuration files definitions."""
|
|
|
|
import asyncio
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any, ClassVar, Dict, Optional
|
|
|
|
import aiofiles
|
|
|
|
import pyotherside
|
|
|
|
from .theme_parser import convert_to_qml
|
|
from .utils import atomic_write, dict_update_recursive
|
|
|
|
if TYPE_CHECKING:
|
|
from .backend import Backend
|
|
|
|
JsonData = Dict[str, Any]
|
|
|
|
WRITE_LOCK = asyncio.Lock()
|
|
|
|
|
|
@dataclass
|
|
class DataFile:
|
|
"""Base class representing a user data file."""
|
|
|
|
is_config: ClassVar[bool] = False
|
|
create_missing: ClassVar[bool] = True
|
|
|
|
backend: "Backend" = field(repr=False)
|
|
filename: str = field()
|
|
|
|
_to_write: Optional[str] = field(init=False, default=None)
|
|
|
|
|
|
def __post_init__(self) -> None:
|
|
asyncio.ensure_future(self._write_loop())
|
|
|
|
|
|
@property
|
|
def path(self) -> Path:
|
|
"""Full path of the file, even if it doesn't exist yet."""
|
|
|
|
if self.is_config:
|
|
return Path(self.backend.appdirs.user_config_dir) / self.filename
|
|
|
|
return Path(self.backend.appdirs.user_data_dir) / self.filename
|
|
|
|
|
|
async def default_data(self):
|
|
"""Default content if the file doesn't exist."""
|
|
|
|
return ""
|
|
|
|
|
|
async def read(self):
|
|
"""Return content of the existing file on disk, or default content."""
|
|
|
|
try:
|
|
return self.path.read_text()
|
|
except FileNotFoundError:
|
|
default = await self.default_data()
|
|
|
|
if self.create_missing:
|
|
await self.write(default)
|
|
|
|
return default
|
|
|
|
|
|
async def write(self, data) -> None:
|
|
"""Request for the file to be written/updated with data."""
|
|
|
|
self._to_write = data
|
|
|
|
|
|
async def _write_loop(self) -> None:
|
|
"""Write/update file on disk with a 1 second cooldown."""
|
|
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
if self._to_write is None:
|
|
continue
|
|
|
|
if not self.create_missing and not self.path.exists():
|
|
continue
|
|
|
|
async with atomic_write(self.path) as (new, done):
|
|
await new.write(self._to_write)
|
|
done()
|
|
|
|
self._to_write = None
|
|
|
|
|
|
|
|
@dataclass
|
|
class JSONDataFile(DataFile):
|
|
"""Represent a user data file in the JSON format."""
|
|
|
|
_data: Optional[Dict[str, Any]] = field(init=False, default=None)
|
|
|
|
|
|
def __getitem__(self, key: str) -> Any:
|
|
if self._data is None:
|
|
raise RuntimeError(f"{self}: read() hasn't been called yet")
|
|
|
|
return self._data[key]
|
|
|
|
|
|
async def default_data(self) -> JsonData:
|
|
return {}
|
|
|
|
|
|
async def read(self) -> JsonData:
|
|
"""Return content of the existing file on disk, or default content.
|
|
|
|
If the file has missing keys, the missing data will be merged and
|
|
written to disk before returning.
|
|
|
|
If `create_missing` is `True` and the file doesn't exist, it will be
|
|
created.
|
|
"""
|
|
|
|
try:
|
|
data = json.loads(self.path.read_text())
|
|
except FileNotFoundError:
|
|
if not self.create_missing:
|
|
data = await self.default_data()
|
|
self._data = data
|
|
return data
|
|
|
|
data = {}
|
|
except json.JSONDecodeError:
|
|
data = {}
|
|
|
|
all_data = await self.default_data()
|
|
dict_update_recursive(all_data, data)
|
|
|
|
if data != all_data:
|
|
await self.write(all_data)
|
|
|
|
self._data = all_data
|
|
return all_data
|
|
|
|
|
|
async def write(self, data: JsonData) -> None:
|
|
js = json.dumps(data, indent=4, ensure_ascii=False, sort_keys=True)
|
|
await super().write(js)
|
|
|
|
|
|
@dataclass
|
|
class Accounts(JSONDataFile):
|
|
"""Config file for saved matrix accounts: user ID, access tokens, etc."""
|
|
|
|
is_config = True
|
|
|
|
filename: str = "accounts.json"
|
|
|
|
|
|
async def any_saved(self) -> bool:
|
|
"""Return whether there are any accounts saved on disk."""
|
|
return bool(await self.read())
|
|
|
|
|
|
async def add(self, user_id: str) -> None:
|
|
"""Add an account to the config and write it on disk.
|
|
|
|
The account's details such as its access token are retrieved from
|
|
the corresponding `MatrixClient` in `backend.clients`.
|
|
"""
|
|
|
|
client = self.backend.clients[user_id]
|
|
|
|
await self.write({
|
|
**await self.read(),
|
|
client.user_id: {
|
|
"homeserver": client.homeserver,
|
|
"token": client.access_token,
|
|
"device_id": client.device_id,
|
|
},
|
|
})
|
|
|
|
|
|
async def delete(self, user_id: str) -> None:
|
|
"""Delete an account from the config and write it on disk."""
|
|
|
|
await self.write({
|
|
uid: info
|
|
for uid, info in (await self.read()).items() if uid != user_id
|
|
})
|
|
|
|
|
|
@dataclass
|
|
class UISettings(JSONDataFile):
|
|
"""Config file for QML interface settings and keybindings."""
|
|
|
|
is_config = True
|
|
|
|
filename: str = "settings.json"
|
|
|
|
|
|
async def default_data(self) -> JsonData:
|
|
return {
|
|
"alertOnMessageForMsec": 4000,
|
|
"alwaysCenterRoomHeader": False,
|
|
"compactMode": True,
|
|
"clearRoomFilterOnEnter": True,
|
|
"clearRoomFilterOnEscape": True,
|
|
"collapseSidePanesUnderWindowWidth": 400,
|
|
"hideProfileChangeEvents": True,
|
|
"hideMembershipEvents": False,
|
|
"hideUnknownEvents": False,
|
|
"theme": "Midnight.qpl",
|
|
"writeAliases": {},
|
|
"media": {
|
|
"autoLoad": True,
|
|
"autoPlay": False,
|
|
"autoPlayGIF": True,
|
|
"autoHideOSDAfterMsec": 3000,
|
|
"defaultVolume": 100,
|
|
"startMuted": False,
|
|
},
|
|
"keys": {
|
|
"startPythonDebugger": ["Alt+Shift+D"],
|
|
"toggleDebugConsole": ["Alt+Shift+C", "F1"],
|
|
"reloadConfig": ["Alt+Shift+R"],
|
|
|
|
"zoomIn": ["Ctrl++"],
|
|
"zoomOut": ["Ctrl+-"],
|
|
"zoomReset": ["Ctrl+="],
|
|
"toggleCompactMode": ["Ctrl+Alt+C"],
|
|
|
|
"scrollUp": ["Alt+Up", "Alt+K"],
|
|
"scrollDown": ["Alt+Down", "Alt+J"],
|
|
"scrollPageUp": ["Alt+Ctrl+Up", "Alt+Ctrl+K", "PgUp"],
|
|
"scrollPageDown": ["Alt+Ctrl+Down", "Alt+Ctrl+J", "PgDown"],
|
|
"scrollToTop":
|
|
["Alt+Ctrl+Shift+Up", "Alt+Ctrl+Shift+K", "Home"],
|
|
"scrollToBottom":
|
|
["Alt+Ctrl+Shift+Down", "Alt+Ctrl+Shift+J", "End"],
|
|
|
|
"previousTab": ["Alt+Shift+Left", "Alt+Shift+H"],
|
|
"nextTab": ["Alt+Shift+Right", "Alt+Shift+L"],
|
|
|
|
"toggleFocusMainPane": ["Alt+F"],
|
|
"clearRoomFilter": ["Alt+Shift+F"],
|
|
"accountSettings": ["Alt+A"],
|
|
"addNewAccount": ["Alt+Shift+A"],
|
|
"addNewChat": ["Alt+C"],
|
|
|
|
"goToLastPage": ["Ctrl+Tab"],
|
|
"goToPreviousRoom": ["Alt+Shift+Up", "Alt+Shift+K"],
|
|
"goToNextRoom": ["Alt+Shift+Down", "Alt+Shift+J"],
|
|
"toggleCollapseAccount": [ "Alt+O"],
|
|
"focusRoomAtIndex": {
|
|
"01": "Alt+1",
|
|
"02": "Alt+2",
|
|
"03": "Alt+3",
|
|
"04": "Alt+4",
|
|
"05": "Alt+5",
|
|
"06": "Alt+6",
|
|
"07": "Alt+7",
|
|
"08": "Alt+8",
|
|
"09": "Alt+9",
|
|
"10": "Alt+0",
|
|
},
|
|
|
|
"unselectAllMessages": ["Escape"],
|
|
"clearRoomMessages": ["Ctrl+L"],
|
|
"sendFile": ["Alt+S"],
|
|
"sendFileFromPathInClipboard": ["Alt+Shift+S"],
|
|
|
|
"toggleFocusRoomPane": ["Alt+R"],
|
|
},
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class UIState(JSONDataFile):
|
|
"""File to save and restore the state of the QML interface."""
|
|
|
|
filename: str = "state.json"
|
|
|
|
|
|
async def default_data(self) -> JsonData:
|
|
return {
|
|
"collapseAccounts": {},
|
|
"page": "Pages/Default.qml",
|
|
"pageProperties": {},
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class History(JSONDataFile):
|
|
"""File to save and restore lines typed by the user in QML components."""
|
|
|
|
filename: str = "history.json"
|
|
|
|
|
|
async def default_data(self) -> JsonData:
|
|
return {"console": []}
|
|
|
|
|
|
@dataclass
|
|
class Theme(DataFile):
|
|
"""A theme file defining the look of QML components."""
|
|
|
|
# Since it currently breaks at every update and the file format will be
|
|
# changed later, don't copy the theme to user data dir if it doesn't exist.
|
|
create_missing = False
|
|
|
|
|
|
@property
|
|
def path(self) -> Path:
|
|
data_dir = Path(self.backend.appdirs.user_data_dir)
|
|
return data_dir / "themes" / self.filename
|
|
|
|
|
|
async def default_data(self) -> str:
|
|
path = f"src/themes/{self.filename}"
|
|
|
|
try:
|
|
byte_content = pyotherside.qrc_get_file_contents(path)
|
|
except ValueError:
|
|
# App was compiled without QRC
|
|
async with aiofiles.open(path) as file:
|
|
return await file.read()
|
|
else:
|
|
return byte_content.decode()
|
|
|
|
|
|
async def read(self) -> str:
|
|
return convert_to_qml(await super().read())
|