import asyncio import json import random from pathlib import Path from typing import Dict, Optional, Set, Tuple from atomicfile import AtomicFile from .app import App from .events import users from .html_filter import HTML_FILTER from .matrix_client import MatrixClient SavedAccounts = Dict[str, Dict[str, str]] CONFIG_LOCK = asyncio.Lock() class Backend: def __init__(self, app: App) -> None: self.app = app self.clients: Dict[str, MatrixClient] = {} self.past_tokens: Dict[str, str] = {} # {room_id: token} self.fully_loaded_rooms: Set[str] = set() # {room_id} self.pending_profile_requests: Set[str] = set() def __repr__(self) -> str: return f"{type(self).__name__}(clients={self.clients!r})" # Clients management async def login_client(self, user: str, password: str, device_id: Optional[str] = None, homeserver: str = "https://matrix.org") -> str: client = MatrixClient( backend=self, user=user, homeserver=homeserver, device_id=device_id ) await client.login(password) self.clients[client.user_id] = client users.AccountUpdated(client.user_id) return client.user_id async def resume_client(self, user_id: str, token: str, device_id: str, homeserver: str = "https://matrix.org") -> None: client = MatrixClient( backend=self, user=user_id, homeserver=homeserver, device_id=device_id ) await client.resume(user_id=user_id, token=token, device_id=device_id) self.clients[client.user_id] = client users.AccountUpdated(client.user_id) async def logout_client(self, user_id: str) -> None: client = self.clients.pop(user_id, None) if client: await client.logout() users.AccountDeleted(user_id) async def logout_all_clients(self) -> None: await asyncio.gather(*( self.logout_client(user_id) for user_id in self.clients.copy() )) # Saved account operations - TODO: Use aiofiles? @property def saved_accounts_path(self) -> Path: return Path(self.app.appdirs.user_config_dir) / "accounts.json" @property def saved_accounts(self) -> SavedAccounts: try: return json.loads(self.saved_accounts_path.read_text()) except (json.JSONDecodeError, FileNotFoundError): return {} async def has_saved_accounts(self) -> bool: return bool(self.saved_accounts) async def load_saved_accounts(self) -> Tuple[str, ...]: async def resume(user_id: str, info: Dict[str, str]) -> str: await self.resume_client( user_id = user_id, token = info["token"], device_id = info["device_id"], homeserver = info["homeserver"], ) return user_id return await asyncio.gather(*( resume(uid, info) for uid, info in self.saved_accounts.items() )) async def save_account(self, user_id: str) -> None: client = self.clients[user_id] await self._write_config({ **self.saved_accounts, client.user_id: { "homeserver": client.homeserver, "token": client.access_token, "device_id": client.device_id, } }) async def forget_account(self, user_id: str) -> None: await self._write_config({ uid: info for uid, info in self.saved_accounts.items() if uid != user_id }) async def _write_config(self, accounts: SavedAccounts) -> None: js = json.dumps(accounts, indent=4, ensure_ascii=False, sort_keys=True) async with CONFIG_LOCK: self.saved_accounts_path.parent.mkdir(parents=True, exist_ok=True) with AtomicFile(self.saved_accounts_path, "w") as new: new.write(js) # General functions async def request_user_update_event(self, user_id: str) -> None: client = self.clients.get(user_id, random.choice(tuple(self.clients.values()))) await client.request_user_update_event(user_id) @staticmethod def inlinify(html: str) -> str: return HTML_FILTER.filter_inline(html)