moment/src/python/backend.py

146 lines
4.3 KiB
Python
Raw Normal View History

import asyncio
import json
import random
from pathlib import Path
from typing import Dict, Optional, Tuple
from atomicfile import AtomicFile
from .app import App
from .events import users
from .html_filter import HTML_FILTER
from .matrix_client import MatrixClient
SavedAccounts = Dict[str, Dict[str, str]]
CONFIG_LOCK = asyncio.Lock()
class Backend:
def __init__(self, app: App) -> None:
self.app = app
self.clients: Dict[str, MatrixClient] = {}
def __repr__(self) -> str:
return f"{type(self).__name__}(clients={self.clients!r})"
# Clients management
async def login_client(self,
user: str,
password: str,
device_id: Optional[str] = None,
homeserver: str = "https://matrix.org") -> str:
client = MatrixClient(
backend=self, user=user, homeserver=homeserver, device_id=device_id
)
await client.login(password)
self.clients[client.user_id] = client
users.AccountUpdated(client.user_id)
return client.user_id
async def resume_client(self,
user_id: str,
token: str,
device_id: str,
homeserver: str = "https://matrix.org") -> None:
client = MatrixClient(
backend=self,
user=user_id, homeserver=homeserver, device_id=device_id
)
await client.resume(user_id=user_id, token=token, device_id=device_id)
self.clients[client.user_id] = client
users.AccountUpdated(client.user_id)
async def logout_client(self, user_id: str) -> None:
client = self.clients.pop(user_id, None)
if client:
await client.logout()
users.AccountDeleted(user_id)
async def logout_all_clients(self) -> None:
await asyncio.gather(*(
self.logout_client(user_id) for user_id in self.clients.copy()
))
# Saved account operations - TODO: Use aiofiles?
@property
def saved_accounts_path(self) -> Path:
return Path(self.app.appdirs.user_config_dir) / "accounts.json"
@property
def saved_accounts(self) -> SavedAccounts:
try:
return json.loads(self.saved_accounts_path.read_text())
except (json.JSONDecodeError, FileNotFoundError):
return {}
async def has_saved_accounts(self) -> bool:
return bool(self.saved_accounts)
async def load_saved_accounts(self) -> Tuple[str, ...]:
async def resume(user_id: str, info: Dict[str, str]) -> str:
await self.resume_client(
user_id = user_id,
token = info["token"],
device_id = info["device_id"],
homeserver = info["homeserver"],
)
return user_id
return await asyncio.gather(*(
resume(uid, info) for uid, info in self.saved_accounts.items()
))
async def save_account(self, user_id: str) -> None:
client = self.clients[user_id]
await self._write_config({
**self.saved_accounts,
client.user_id: {
"homeserver": client.homeserver,
"token": client.access_token,
"device_id": client.device_id,
}
})
async def forget_account(self, user_id: str) -> None:
await self._write_config({
uid: info
for uid, info in self.saved_accounts.items() if uid != user_id
})
async def _write_config(self, accounts: SavedAccounts) -> None:
js = json.dumps(accounts, indent=4, ensure_ascii=False, sort_keys=True)
async with CONFIG_LOCK:
self.saved_accounts_path.parent.mkdir(parents=True, exist_ok=True)
with AtomicFile(self.saved_accounts_path, "w") as new:
new.write(js)
# General functions
async def request_user_update_event(self, user_id: str) -> None:
client = self.clients.get(user_id,
random.choice(tuple(self.clients.values())))
await client.request_user_update_event(user_id)
@staticmethod
def inlinify(html: str) -> str:
return HTML_FILTER.filter_inline(html)