From 817ae905fe6ef912f9dd15ad5a69ae3c31451bb1 Mon Sep 17 00:00:00 2001 From: miruka Date: Mon, 16 Dec 2019 07:02:42 -0400 Subject: [PATCH] Backend._any_clients(): only pick a healthy client When selecting a random client for get_profile(), thumbnail(), or download() requests, ignore those that aren't currently syncing. Also warn if we still can't find a healthy client after 30s. --- src/python/backend.py | 40 +++++++++++++++++++++++-------------- src/python/matrix_client.py | 13 +++++++++--- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/python/backend.py b/src/python/backend.py index 72a0db75..a3bca65f 100644 --- a/src/python/backend.py +++ b/src/python/backend.py @@ -1,7 +1,7 @@ import asyncio import logging as log from pathlib import Path -from typing import Any, DefaultDict, Dict, List, Optional, Tuple +from typing import Any, Callable, DefaultDict, Dict, List, Optional, Tuple import hsluv @@ -82,10 +82,11 @@ class Backend: backend=self, user=user_id, homeserver=homeserver, device_id=device_id, ) - await client.resume(user_id=user_id, token=token, device_id=device_id) - self.clients[client.user_id] = client - self.models[Account][client.user_id] = Account(client.user_id) + self.clients[user_id] = client + self.models[Account][user_id] = Account(user_id) + + await client.resume(user_id=user_id, token=token, device_id=device_id) async def load_saved_accounts(self) -> Tuple[str, ...]: @@ -169,14 +170,22 @@ class Backend: # Client functions that don't need authentification - async def _any_client(self, prefer: str = "") -> MatrixClient: - while True: - if self.clients: - break - await asyncio.sleep(0.1) + async def _any_client(self, caller: Callable, *args, **kw) -> MatrixClient: + failures = 0 - return self.clients.get(prefer) or \ - next(c for c in self.clients.values()) + while True: + for client in self.clients.values(): + if client.syncing: + return client + + await asyncio.sleep(0.1) + failures += 1 + + if failures and failures % 300 == 0: + log.warn( + "No syncing client found after %ds of wait for %s %r %r", + failures / 10, caller.__name__, args, kw, + ) async def get_profile(self, user_id: str) -> nio.ProfileGetResponse: @@ -184,7 +193,7 @@ class Backend: return self.profile_cache[user_id] async with self.get_profile_locks[user_id]: - client = await self._any_client(prefer=user_id) + client = await self._any_client(self.get_profile, user_id) response = await client.get_profile(user_id) if isinstance(response, nio.ProfileGetError): @@ -198,8 +207,9 @@ class Backend: self, server_name: str, media_id: str, width: int, height: int, ) -> nio.ThumbnailResponse: - client = await self._any_client() - response = await client.thumbnail(server_name, media_id, width, height) + args = (server_name, media_id, width, height) + client = await self._any_client(self.thumbnail, *args) + response = await client.thumbnail(*args) if isinstance(response, nio.ThumbnailError): raise MatrixError.from_nio(response) @@ -211,7 +221,7 @@ class Backend: self, server_name: str, media_id: str, ) -> nio.DownloadResponse: - client = await self._any_client() + client = await self._any_client(self.download, server_name, media_id) response = await client.download(server_name, media_id) if isinstance(response, nio.DownloadError): diff --git a/src/python/matrix_client.py b/src/python/matrix_client.py index 36eace54..049fdd7b 100644 --- a/src/python/matrix_client.py +++ b/src/python/matrix_client.py @@ -134,9 +134,8 @@ class MatrixClient(nio.AsyncClient): async def resume(self, user_id: str, token: str, device_id: str) -> None: - self.user_id = user_id - self.access_token = token - self.device_id = device_id + response = nio.LoginResponse(user_id, device_id, token) + await self.receive_response(response) asyncio.ensure_future(self.start()) @@ -152,6 +151,14 @@ class MatrixClient(nio.AsyncClient): await self.close() + @property + def syncing(self) -> bool: + if not self.sync_task: + return False + + return not self.sync_task.done() + + async def start(self) -> None: def on_profile_response(future) -> None: exception = future.exception()