- Allow using invisible mode on servers not supporting presence, to make use of the prevention of sending typing notifications and read marker updates - For servers not supporting presence, display the account's presence orb in the left pane with half opacity - Indicate in the presence orb tooltip when the presence we set hasn't yet been noticed by the server/the server doesn't support presence - When reconnecting after being offline, if the server doesn't indicate we have a status message [set from another client], restore any previous message we had set in the current client - Show our status message striked out when we're invisible or offline to indicate that it isn't being broadcasted - Some code style cleanups - Try to handle cases where we set a presence, but receive a new presence event for our account before the server takes notice of that new presence we want, which probably resulted in the "account keeps switching between online and unavailable every few sec" glitch
635 lines
21 KiB
Python
635 lines
21 KiB
Python
# Copyright Mirage authors & contributors <https://github.com/mirukana/mirage>
|
|
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
|
|
import asyncio
|
|
import io
|
|
import logging as log
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import wave
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, DefaultDict, Dict, List, Optional, Tuple, Union
|
|
from urllib.parse import urlparse
|
|
|
|
import aiohttp
|
|
import nio
|
|
import plyer
|
|
import pyotherside
|
|
import simpleaudio
|
|
from appdirs import AppDirs
|
|
from nio.client.async_client import client_session
|
|
|
|
from . import __app_name__
|
|
from .errors import MatrixError, MatrixInvalidAccessToken
|
|
from .matrix_client import MatrixClient
|
|
from .media_cache import MediaCache
|
|
from .models import SyncId
|
|
from .models.filters import FieldStringFilter
|
|
from .models.items import Account, Event, Homeserver, PingStatus
|
|
from .models.model import Model
|
|
from .models.model_store import ModelStore
|
|
from .presence import Presence
|
|
from .sso_server import SSOServer
|
|
from .user_files import (
|
|
Accounts, History, NewTheme, Pre070Settings, Settings, Theme, UIState,
|
|
)
|
|
|
|
# Logging configuration
|
|
log.getLogger().setLevel(log.INFO)
|
|
nio.logger_group.level = nio.log.logbook.ERROR
|
|
nio.log.logbook.StreamHandler(sys.stderr).push_application()
|
|
|
|
|
|
class Backend:
|
|
"""Manage matrix clients and provide other useful general methods.
|
|
|
|
Attributes:
|
|
saved_accounts: User config file for saved matrix account.
|
|
|
|
settings: User config file for general UI and backend settings.
|
|
|
|
ui_state: User data file for saving/restoring QML UI state.
|
|
|
|
history: User data file for saving/restoring text typed into QML
|
|
components.
|
|
|
|
models: A mapping containing our data models that are
|
|
synchronized between the Python backend and the QML UI.
|
|
The models should only ever be modified from the backend.
|
|
|
|
If a non-existent key is accessed, it is created and an
|
|
associated `Model` and returned.
|
|
|
|
The mapping keys are the `Model`'s synchronization ID,
|
|
which a strings or tuple of strings.
|
|
|
|
Currently used sync ID throughout the code are:
|
|
|
|
- `"accounts"`: logged-in accounts;
|
|
|
|
- `("<user_id>", "pushrules")`: push rules configured for our
|
|
account `user_id`.
|
|
|
|
- `("<user_id>", "rooms")`: rooms our account `user_id` is part of;
|
|
|
|
- `("<user_id>", "transfers")`: ongoing or failed file
|
|
uploads/downloads for our account `user_id`;
|
|
|
|
- `("<user_id>", "<room_id>", "members")`: members in the room
|
|
`room_id` that our account `user_id` is part of;
|
|
|
|
- `("<user_id>", "<room_id>", "events")`: state events and messages
|
|
in the room `room_id` that our account `user_id` is part of.
|
|
|
|
Special models:
|
|
|
|
- `"all_rooms"`: See `models.special_models.AllRooms` docstring
|
|
|
|
- `"matching_accounts"`
|
|
See `models.special_models.MatchingAccounts` docstring
|
|
|
|
- `("<user_id>", "<room_id>", "filtered_members")`:
|
|
See `models.special_models.FilteredMembers` docstring
|
|
|
|
- `("filtered_homeservers")`:
|
|
See `models.special_models.FilteredHomeservers` docstring
|
|
|
|
clients: A `{user_id: MatrixClient}` dict for the logged-in clients
|
|
we managed. Every client is logged to one matrix account.
|
|
|
|
media_cache: A matrix media cache for downloaded files.
|
|
|
|
presences: A `{user_id: Presence}` dict for storing presence info about
|
|
matrix users registered on Mirage.
|
|
|
|
mxc_events: A dict storing media `Event` model items for any account
|
|
that have the same mxc URI
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.appdirs = AppDirs(appname=__app_name__, roaming=True)
|
|
|
|
self.models = ModelStore()
|
|
|
|
self.saved_accounts = Accounts(self)
|
|
self.settings = Settings(self)
|
|
self.ui_state = UIState(self)
|
|
self.history = History(self)
|
|
self.theme = Theme(self, self.settings.General.theme)
|
|
# self.new_theme = NewTheme(self, self.settings.General.new_theme)
|
|
self.new_theme = NewTheme(self, ".new.py") # TODO
|
|
Pre070Settings(self)
|
|
|
|
self.clients: Dict[str, MatrixClient] = {}
|
|
|
|
self._sso_server: Optional[SSOServer] = None
|
|
self._sso_server_task: Optional[asyncio.Future] = None
|
|
|
|
self.profile_cache: Dict[str, nio.ProfileGetResponse] = {}
|
|
self.get_profile_locks: DefaultDict[str, asyncio.Lock] = \
|
|
DefaultDict(asyncio.Lock) # {user_id: lock}
|
|
|
|
self.send_locks: DefaultDict[str, asyncio.Lock] = \
|
|
DefaultDict(asyncio.Lock) # {room_id: lock}
|
|
|
|
cache_dir = Path(
|
|
os.environ.get("MIRAGE_CACHE_DIR") or self.appdirs.user_cache_dir,
|
|
)
|
|
|
|
self.media_cache: MediaCache = MediaCache(self, cache_dir)
|
|
|
|
self.presences: Dict[str, Presence] = {}
|
|
|
|
self.concurrent_get_presence_limit = asyncio.BoundedSemaphore(8)
|
|
|
|
self.mxc_events: DefaultDict[str, List[Event]] = DefaultDict(list)
|
|
|
|
self.notification_avatar_cache: Dict[str, Path] = {} # {mxc: path}
|
|
self.notifications_working: bool = True
|
|
self.audio_working: bool = True
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
return f"{type(self).__name__}(clients={self.clients!r})"
|
|
|
|
|
|
# Clients management
|
|
|
|
async def server_info(self, homeserver: str) -> Tuple[str, List[str]]:
|
|
"""Return server's real URL and supported login flows.
|
|
|
|
Retrieving the real URL uses the `.well-known` API.
|
|
Possible login methods include `m.login.password` or `m.login.sso`.
|
|
"""
|
|
|
|
if not re.match(r"https?://", homeserver):
|
|
homeserver = f"http://{homeserver}"
|
|
|
|
client = MatrixClient(self, homeserver=homeserver)
|
|
http_re = re.compile("^http://")
|
|
is_local = urlparse(client.homeserver).netloc.split(":")[0] in (
|
|
"localhost", "127.0.0.1", "::1",
|
|
)
|
|
|
|
try:
|
|
client.homeserver = (await client.discovery_info()).homeserver_url
|
|
except MatrixError:
|
|
# This is either already the real URL, or an invalid URL.
|
|
pass
|
|
|
|
try:
|
|
try:
|
|
login_response = await client.login_info()
|
|
except (asyncio.TimeoutError, MatrixError):
|
|
# Maybe we still have a http URL and server only supports https
|
|
client.homeserver = http_re.sub("https://", client.homeserver)
|
|
login_response = await client.login_info()
|
|
|
|
# If we still have a http URL and server redirected to https
|
|
if login_response.transport_response.real_url.scheme == "https":
|
|
client.homeserver = http_re.sub("https://", client.homeserver)
|
|
|
|
# If we still have a http URL and server accept both http and https
|
|
if http_re.match(client.homeserver) and not is_local:
|
|
original = client.homeserver
|
|
client.homeserver = http_re.sub("https://", client.homeserver)
|
|
|
|
try:
|
|
await asyncio.wait_for(client.login_info(), timeout=6)
|
|
except (asyncio.TimeoutError, MatrixError):
|
|
client.homeserver = original
|
|
|
|
return (client.homeserver, login_response.flows)
|
|
finally:
|
|
await client.close()
|
|
|
|
|
|
async def password_auth(
|
|
self, user: str, password: str, homeserver: str,
|
|
) -> str:
|
|
"""Create & register a `MatrixClient`, login using the password
|
|
and return the user ID we get.
|
|
"""
|
|
|
|
client = MatrixClient(self, user=user, homeserver=homeserver)
|
|
return await self._do_login(client, password=password)
|
|
|
|
|
|
async def start_sso_auth(self, homeserver: str) -> str:
|
|
"""Start SSO server and return URL to open in the user's browser.
|
|
|
|
See the `sso_server.SSOServer` class documentation.
|
|
Once the returned URL has been opened in the user's browser
|
|
(done from QML), `MatrixClient.continue_sso_auth()` should be called.
|
|
"""
|
|
|
|
server = SSOServer(homeserver)
|
|
self._sso_server = server
|
|
self._sso_server_task = asyncio.ensure_future(server.wait_for_token())
|
|
return server.url_to_open
|
|
|
|
|
|
async def continue_sso_auth(self) -> str:
|
|
"""Wait for the started SSO server to get a token, then login.
|
|
|
|
`MatrixClient.start_sso_auth()` must be called first.
|
|
Creates and register a `MatrixClient` for logging in.
|
|
Returns the user ID we get from logging in.
|
|
"""
|
|
|
|
if not self._sso_server or not self._sso_server_task:
|
|
raise RuntimeError("Must call Backend.start_sso_auth() first")
|
|
|
|
await self._sso_server_task
|
|
homeserver = self._sso_server.for_homeserver
|
|
token = self._sso_server_task.result()
|
|
self._sso_server_task = None
|
|
self._sso_server = None
|
|
|
|
client = MatrixClient(self, homeserver=homeserver)
|
|
return await self._do_login(client, token=token)
|
|
|
|
|
|
async def _do_login(self, client: MatrixClient, **login_kwargs) -> str:
|
|
"""Login on a client. If successful, register it and return user ID."""
|
|
|
|
try:
|
|
await client.login(**login_kwargs)
|
|
except MatrixError:
|
|
await client.close()
|
|
raise
|
|
|
|
if client.user_id in self.clients:
|
|
await client.logout()
|
|
return client.user_id
|
|
|
|
self.clients[client.user_id] = client
|
|
return client.user_id
|
|
|
|
|
|
async def resume_client(
|
|
self,
|
|
user_id: str,
|
|
token: str,
|
|
device_id: str,
|
|
homeserver: str,
|
|
state: str = "online",
|
|
status_msg: str = "",
|
|
) -> None:
|
|
"""Create and register a `MatrixClient` with known account details."""
|
|
|
|
client = MatrixClient(
|
|
self, user=user_id, homeserver=homeserver, device_id=device_id,
|
|
)
|
|
|
|
self.clients[user_id] = client
|
|
|
|
await client.resume(user_id, token, device_id, state, status_msg)
|
|
|
|
|
|
async def load_saved_accounts(self) -> List[str]:
|
|
"""Call `resume_client` for all saved accounts in user config."""
|
|
|
|
async def resume(user_id: str, info: Dict[str, Any]) -> str:
|
|
# Get or create account model
|
|
self.models["accounts"].setdefault(
|
|
user_id, Account(user_id, info.get("order", -1)),
|
|
)
|
|
|
|
await self.resume_client(
|
|
user_id = user_id,
|
|
token = info["token"],
|
|
device_id = info["device_id"],
|
|
homeserver = info["homeserver"],
|
|
state = info.get("presence", "online"),
|
|
status_msg = info.get("status_msg", ""),
|
|
)
|
|
|
|
return user_id
|
|
|
|
return await asyncio.gather(*(
|
|
resume(user_id, info)
|
|
for user_id, info in self.saved_accounts.items()
|
|
if info.get("enabled", True)
|
|
))
|
|
|
|
|
|
async def logout_client(self, user_id: str) -> None:
|
|
"""Log a `MatrixClient` out and unregister it from our models."""
|
|
|
|
client = self.clients.pop(user_id, None)
|
|
|
|
if client:
|
|
try:
|
|
await client.logout()
|
|
except MatrixInvalidAccessToken:
|
|
pass
|
|
|
|
self.models["accounts"].pop(user_id, None)
|
|
self.models["matching_accounts"].pop(user_id, None)
|
|
self.models[user_id, "transfers"].clear()
|
|
|
|
for room_id in self.models[user_id, "rooms"]:
|
|
self.models["all_rooms"].pop(room_id, None)
|
|
self.models[user_id, room_id, "members"].clear()
|
|
self.models[user_id, room_id, "events"].clear()
|
|
self.models[user_id, room_id, "filtered_members"].clear()
|
|
|
|
self.models[user_id, "rooms"].clear()
|
|
|
|
await self.saved_accounts.forget(user_id)
|
|
|
|
|
|
async def terminate_clients(self) -> None:
|
|
"""Call every `MatrixClient`'s `terminate()` method."""
|
|
|
|
log.info("Setting clients offline...")
|
|
tasks = [client.terminate() for client in self.clients.values()]
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
async def get_client(self, user_id: str, _debug_info=None) -> MatrixClient:
|
|
"""Wait until a `MatrixClient` is registered in model and return it."""
|
|
|
|
failures = 0
|
|
|
|
while True:
|
|
if user_id in self.clients:
|
|
return self.clients[user_id]
|
|
|
|
if failures and failures % 100 == 0: # every 10s except first time
|
|
log.warning(
|
|
"Client %r not found after %ds, _debug_info:\n%r",
|
|
user_id, failures / 10, _debug_info,
|
|
)
|
|
|
|
await asyncio.sleep(0.1)
|
|
failures += 1
|
|
|
|
|
|
# Multi-client Matrix functions
|
|
|
|
async def update_room_read_marker(
|
|
self, room_id: str, event_id: str,
|
|
) -> None:
|
|
"""Update room's read marker to an event for all accounts part of it.
|
|
"""
|
|
|
|
async def update(client: MatrixClient) -> None:
|
|
room = self.models[client.user_id, "rooms"].get(room_id)
|
|
account = self.models["accounts"][client.user_id]
|
|
|
|
if room:
|
|
room.set_fields(unreads=0, highlights=0, local_unreads=False)
|
|
await client.update_account_unread_counts()
|
|
|
|
if account.presence not in [
|
|
Presence.State.echo_invisible,
|
|
Presence.State.invisible,
|
|
Presence.State.offline,
|
|
]:
|
|
await client.update_receipt_marker(room_id, event_id)
|
|
|
|
await asyncio.gather(*[update(c) for c in self.clients.values()])
|
|
|
|
|
|
async def verify_device(
|
|
self, user_id: str, device_id: str, ed25519_key: str,
|
|
) -> None:
|
|
"""Mark a device as verified on all our accounts."""
|
|
|
|
for client in self.clients.values():
|
|
try:
|
|
device = client.device_store[user_id][device_id]
|
|
except KeyError:
|
|
continue
|
|
|
|
if device.ed25519 == ed25519_key:
|
|
client.verify_device(device)
|
|
|
|
|
|
async def blacklist_device(
|
|
self, user_id: str, device_id: str, ed25519_key: str,
|
|
) -> None:
|
|
"""Mark a device as blacklisted on all our accounts."""
|
|
|
|
for client in self.clients.values():
|
|
try:
|
|
# This won't include the client's current device, as expected
|
|
device = client.device_store[user_id][device_id]
|
|
except KeyError:
|
|
continue
|
|
|
|
if device.ed25519 == ed25519_key:
|
|
client.blacklist_device(device)
|
|
|
|
|
|
# General functions
|
|
|
|
async def get_config_dir(self) -> Path:
|
|
return Path(self.appdirs.user_config_dir)
|
|
|
|
|
|
async def get_theme_dir(self) -> Path:
|
|
path = Path(self.appdirs.user_data_dir) / "themes"
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
async def get_settings(self) -> Tuple[dict, UIState, History, str, dict]:
|
|
"""Return parsed user config files for QML."""
|
|
return (
|
|
self.settings.qml_data,
|
|
self.ui_state.qml_data,
|
|
self.history.qml_data,
|
|
self.theme.qml_data,
|
|
self.new_theme.qml_data,
|
|
)
|
|
|
|
|
|
async def set_string_filter(
|
|
self, model_id: Union[SyncId, List[str]], value: str,
|
|
) -> None:
|
|
"""Set a FieldStringFilter (or derived class) model's filter property.
|
|
|
|
This should only be called from QML.
|
|
"""
|
|
|
|
if isinstance(model_id, list): # QML can't pass tuples
|
|
model_id = tuple(model_id)
|
|
|
|
model = Model.proxies[model_id]
|
|
|
|
if not isinstance(model, FieldStringFilter):
|
|
raise TypeError("model_id must point to a FieldStringFilter")
|
|
|
|
model.filter = value
|
|
|
|
|
|
async def set_account_collapse(self, user_id: str, collapse: bool) -> None:
|
|
"""Call `set_account_collapse()` on the `all_rooms` model.
|
|
|
|
This should only be called from QML.
|
|
"""
|
|
self.models["all_rooms"].set_account_collapse(user_id, collapse)
|
|
|
|
|
|
async def _ping_homeserver(
|
|
self, session: aiohttp.ClientSession, homeserver_url: str,
|
|
) -> None:
|
|
"""Ping a homeserver present in our model and set its `ping` field."""
|
|
|
|
item = self.models["homeservers"][homeserver_url]
|
|
times = []
|
|
|
|
for i in range(16):
|
|
start = time.time()
|
|
|
|
try:
|
|
await session.get(f"{homeserver_url}/_matrix/client/versions")
|
|
except aiohttp.ClientError as err:
|
|
log.warning("Failed pinging %s: %r", homeserver_url, err)
|
|
item.status = PingStatus.Failed
|
|
return
|
|
|
|
times.append(round((time.time() - start) * 1000))
|
|
|
|
if i == 7 or i == 15:
|
|
item.set_fields(
|
|
ping=sum(times) // len(times), status=PingStatus.Done,
|
|
)
|
|
|
|
|
|
def _get_homeserver_stability(
|
|
self, logs: List[Dict[str, Any]],
|
|
) -> Tuple[float, List[timedelta]]:
|
|
"""Return server stability % and a list of downtime durations."""
|
|
|
|
stability = 100.0
|
|
durations = []
|
|
|
|
for period in logs:
|
|
started_at = datetime.fromtimestamp(period["datetime"])
|
|
time_since_now = datetime.now() - started_at
|
|
|
|
if time_since_now.days > 30 or period["type"] != 1: # 1 = downtime
|
|
continue
|
|
|
|
lasted_minutes = period["duration"] / 60
|
|
durations.append(timedelta(seconds=period["duration"]))
|
|
|
|
stability -= (
|
|
(lasted_minutes * stability / 1000) /
|
|
max(1, time_since_now.days / 3)
|
|
)
|
|
|
|
return (stability, durations)
|
|
|
|
|
|
async def fetch_homeservers(self) -> None:
|
|
"""Retrieve a list of public homeservers and add them to our model."""
|
|
|
|
@client_session # need to trigger this decorator for creation
|
|
async def have_session_be_created(*_):
|
|
pass
|
|
|
|
# We just want that client's aiohttp session, that way we don't have
|
|
# to depend ourselves on aiohttp + aiohttp-socks
|
|
proxy = self.settings.General.proxy
|
|
client = nio.AsyncClient(homeserver="", proxy=proxy)
|
|
await have_session_be_created(client)
|
|
|
|
|
|
session = client.client_session
|
|
# aiohttp only has "timeout" in 3.7.0+
|
|
timeout = getattr(session, "timeout", session._timeout)
|
|
session = type(session)(
|
|
raise_for_status = True,
|
|
timeout = type(timeout)(total=20),
|
|
connector = session.connector,
|
|
)
|
|
|
|
api_list = "https://publiclist.anchel.nl/publiclist.json"
|
|
response = await session.get(api_list)
|
|
coros = []
|
|
|
|
for server in (await response.json()):
|
|
homeserver_url = server["homeserver"]
|
|
|
|
if homeserver_url.startswith("http://"): # insecure server
|
|
continue
|
|
|
|
if not re.match(r"^https?://.+", homeserver_url):
|
|
homeserver_url = f"https://{homeserver_url}"
|
|
|
|
if server["country"] == "USA":
|
|
server["country"] = "United States"
|
|
|
|
stability, durations = \
|
|
self._get_homeserver_stability(server["monitor"]["logs"])
|
|
|
|
self.models["homeservers"][homeserver_url] = Homeserver(
|
|
id = homeserver_url,
|
|
name = server["name"],
|
|
site_url = server["url"],
|
|
country = server["country"],
|
|
stability = stability,
|
|
downtimes_ms = [d.total_seconds() * 1000 for d in durations],
|
|
)
|
|
|
|
coros.append(self._ping_homeserver(session, homeserver_url))
|
|
|
|
await asyncio.gather(*coros)
|
|
await session.close()
|
|
|
|
|
|
async def desktop_notify(
|
|
self, title: str, body: str = "", image: Union[Path, str] = "",
|
|
) -> None:
|
|
# XXX: images on windows must be .ICO
|
|
|
|
try:
|
|
plyer.notification.notify(
|
|
title = title,
|
|
message = body,
|
|
app_name = __app_name__,
|
|
app_icon = str(image),
|
|
timeout = 10,
|
|
toast = False,
|
|
)
|
|
self.notifications_working = True
|
|
except Exception: # noqa
|
|
if self.notifications_working:
|
|
trace = traceback.format_exc().rstrip()
|
|
log.error("Sending desktop notification failed\n%s", trace)
|
|
self.notifications_working = False
|
|
|
|
|
|
async def sound_notify(self) -> None:
|
|
path = self.settings.Notifications.default_sound
|
|
path = str(Path(path).expanduser())
|
|
|
|
if path == "default.wav":
|
|
path = "src/sounds/default.wav"
|
|
|
|
try:
|
|
content = pyotherside.qrc_get_file_contents(path)
|
|
except ValueError:
|
|
sa = simpleaudio.WaveObject.from_wave_file(path)
|
|
else:
|
|
wave_read = wave.open(io.BytesIO(content))
|
|
sa = simpleaudio.WaveObject.from_wave_read(wave_read)
|
|
|
|
try:
|
|
sa.play()
|
|
self.audio_working = True
|
|
except Exception as e: # noqa
|
|
if self.audio_working:
|
|
trace = traceback.format_exc().rstrip()
|
|
log.error("Playing audio failed\n%s", trace)
|
|
self.audio_working = False
|