moment/src/python/backend.py
miruka 2f19ff493b Rewrite media caching (old image provider)
- Doesn't use pyotherside's image provider feature, for more flexibility
  and simplicity
- Suitable for supporting matrix media events and more later
- Avoid a lot of duplicate files that the old cache created due to
  server not returning what we expect, mistakes in Python/QML code, etc
- Changed file structure
  (e.g. thumbnails/32x32/<mxc id> instead of
   thumbnails/<mxc id>.32.32.crop)

- Backend.wait_until_account_exist: start issuing warnings if the
  function runs for more than 10s, which means in most case a bad user
  ID was passed

- New HMxcImage QML component, used in H(User/Room)Avatar
2019-11-03 13:48:12 -04:00

210 lines
6.6 KiB
Python

import asyncio
import logging as log
import random
from typing import Any, DefaultDict, Dict, List, Optional, Tuple, Union
import hsluv
import nio
from .app import App
from .matrix_client import MatrixClient
from .models.items import Account, Device, Event, Member, Room
from .models.model_store import ModelStore
ProfileResponse = Union[nio.ProfileGetResponse, nio.ProfileGetError]
class Backend:
def __init__(self, app: App) -> None:
self.app = app
from . import config_files
self.saved_accounts = config_files.Accounts(self)
self.ui_settings = config_files.UISettings(self)
self.ui_state = config_files.UIState(self)
self.models = ModelStore(allowed_key_types={
Account, # Logged-in accounts
(Device, str), # Devices of user_id
(Room, str), # Rooms for user_id
(Member, str), # Members in room_id
(Event, str, str), # Events for account user_id for room_id
})
self.clients: Dict[str, MatrixClient] = {}
self.profile_cache: Dict[str, nio.ProfileGetResponse] = {}
self.get_profile_locks: DefaultDict[str, asyncio.Lock] = \
DefaultDict(asyncio.Lock) # {user_id: lock}
def __repr__(self) -> str:
return f"{type(self).__name__}(clients={self.clients!r})"
# Clients management
async def login_client(self,
user: str,
password: str,
device_id: Optional[str] = None,
homeserver: str = "https://matrix.org",
) -> Tuple[bool, str]:
client = MatrixClient(
self, user=user, homeserver=homeserver, device_id=device_id,
)
try:
await client.login(password)
except RuntimeError as err: # XXX raise
await client.close()
return (False, err.args[0].message)
self.clients[client.user_id] = client
self.models[Account][client.user_id] = Account(client.user_id)
return (True, client.user_id)
async def resume_client(self,
user_id: str,
token: str,
device_id: str,
homeserver: str = "https://matrix.org") -> None:
client = MatrixClient(
backend=self,
user=user_id, homeserver=homeserver, device_id=device_id,
)
await client.resume(user_id=user_id, token=token, device_id=device_id)
self.clients[client.user_id] = client
self.models[Account][client.user_id] = Account(client.user_id)
async def load_saved_accounts(self) -> Tuple[str, ...]:
async def resume(user_id: str, info: Dict[str, str]) -> str:
await self.resume_client(
user_id = user_id,
token = info["token"],
device_id = info["device_id"],
homeserver = info["homeserver"],
)
return user_id
return await asyncio.gather(*(
resume(uid, info)
for uid, info in (await self.saved_accounts.read()).items()
))
async def logout_client(self, user_id: str) -> None:
client = self.clients.pop(user_id, None)
if client:
self.models[Account].pop(user_id, None)
await client.logout()
await self.saved_accounts.delete(user_id)
async def wait_until_client_exists(self, user_id: str = "") -> None:
loops = 0
while True:
if user_id and user_id in self.clients:
return
if not user_id and self.clients:
return
if loops and loops % 100 == 0: # every 10s except first time
log.warning("Waiting for account %s to exist, %ds passed",
user_id, loops // 10)
await asyncio.sleep(0.1)
loops += 1
# General functions
@staticmethod
def hsluv(hue: int, saturation: int, lightness: int) -> List[float]:
# (0-360, 0-100, 0-100) -> [0-1, 0-1, 0-1]
return hsluv.hsluv_to_rgb([hue, saturation, lightness])
@staticmethod
async def check_exported_keys_passphrase(file_path: str, passphrase: str,
) -> Union[bool, Tuple[str, bool]]:
"""Check if the exported keys file can be decrypted with passphrase.
Returns True on success, False is the passphrase is invalid, or
an (error_message, error_is_translated) tuple if another error occured.
"""
try:
nio.crypto.key_export.decrypt_and_read(file_path, passphrase)
return True
except OSError as err: # XXX raise
return (f"{file_path}: {err.strerror}", True)
except ValueError as err: # XXX raise
if str(err).startswith("HMAC check failed"):
return False
return (str(err), False)
async def load_settings(self) -> tuple:
from .config_files import Theme
settings = await self.ui_settings.read()
ui_state = await self.ui_state.read()
theme = await Theme(self, settings["theme"]).read()
return (settings, ui_state, theme)
async def get_profile(self, user_id: str) -> ProfileResponse:
if user_id in self.profile_cache:
return self.profile_cache[user_id]
async with self.get_profile_locks[user_id]:
if not self.clients:
await self.wait_until_client_exists()
client = self.clients.get(
user_id,
random.choice(tuple(self.clients.values())),
)
response = await client.get_profile(user_id)
if isinstance(response, nio.ProfileGetError):
log.warning("%s: %s", user_id, response)
self.profile_cache[user_id] = response
return response
async def get_flat_sidepane_data(self) -> List[Dict[str, Any]]:
data = []
for account in sorted(self.models[Account].values()):
data.append({
"type": "Account",
"id": account.user_id,
"user_id": account.user_id,
"data": account.serialized,
})
for room in sorted(self.models[Room, account.user_id].values()):
data.append({
"type": "Room",
"id": "/".join((account.user_id, room.room_id)),
"user_id": account.user_id,
"data": room.serialized,
})
return data