Backend._any_clients(): only pick a healthy client

When selecting a random client for get_profile(), thumbnail(), or
download() requests, ignore those that aren't currently syncing.
Also warn if we still can't find a healthy client after 30s.
This commit is contained in:
miruka 2019-12-16 07:02:42 -04:00
parent 5b4146c854
commit 817ae905fe
2 changed files with 35 additions and 18 deletions

View File

@ -1,7 +1,7 @@
import asyncio import asyncio
import logging as log import logging as log
from pathlib import Path from pathlib import Path
from typing import Any, DefaultDict, Dict, List, Optional, Tuple from typing import Any, Callable, DefaultDict, Dict, List, Optional, Tuple
import hsluv import hsluv
@ -82,10 +82,11 @@ class Backend:
backend=self, backend=self,
user=user_id, homeserver=homeserver, device_id=device_id, user=user_id, homeserver=homeserver, device_id=device_id,
) )
await client.resume(user_id=user_id, token=token, device_id=device_id)
self.clients[client.user_id] = client self.clients[user_id] = client
self.models[Account][client.user_id] = Account(client.user_id) self.models[Account][user_id] = Account(user_id)
await client.resume(user_id=user_id, token=token, device_id=device_id)
async def load_saved_accounts(self) -> Tuple[str, ...]: async def load_saved_accounts(self) -> Tuple[str, ...]:
@ -169,14 +170,22 @@ class Backend:
# Client functions that don't need authentification # Client functions that don't need authentification
async def _any_client(self, prefer: str = "") -> MatrixClient: async def _any_client(self, caller: Callable, *args, **kw) -> MatrixClient:
while True: failures = 0
if self.clients:
break
await asyncio.sleep(0.1)
return self.clients.get(prefer) or \ while True:
next(c for c in self.clients.values()) for client in self.clients.values():
if client.syncing:
return client
await asyncio.sleep(0.1)
failures += 1
if failures and failures % 300 == 0:
log.warn(
"No syncing client found after %ds of wait for %s %r %r",
failures / 10, caller.__name__, args, kw,
)
async def get_profile(self, user_id: str) -> nio.ProfileGetResponse: async def get_profile(self, user_id: str) -> nio.ProfileGetResponse:
@ -184,7 +193,7 @@ class Backend:
return self.profile_cache[user_id] return self.profile_cache[user_id]
async with self.get_profile_locks[user_id]: async with self.get_profile_locks[user_id]:
client = await self._any_client(prefer=user_id) client = await self._any_client(self.get_profile, user_id)
response = await client.get_profile(user_id) response = await client.get_profile(user_id)
if isinstance(response, nio.ProfileGetError): if isinstance(response, nio.ProfileGetError):
@ -198,8 +207,9 @@ class Backend:
self, server_name: str, media_id: str, width: int, height: int, self, server_name: str, media_id: str, width: int, height: int,
) -> nio.ThumbnailResponse: ) -> nio.ThumbnailResponse:
client = await self._any_client() args = (server_name, media_id, width, height)
response = await client.thumbnail(server_name, media_id, width, height) client = await self._any_client(self.thumbnail, *args)
response = await client.thumbnail(*args)
if isinstance(response, nio.ThumbnailError): if isinstance(response, nio.ThumbnailError):
raise MatrixError.from_nio(response) raise MatrixError.from_nio(response)
@ -211,7 +221,7 @@ class Backend:
self, server_name: str, media_id: str, self, server_name: str, media_id: str,
) -> nio.DownloadResponse: ) -> nio.DownloadResponse:
client = await self._any_client() client = await self._any_client(self.download, server_name, media_id)
response = await client.download(server_name, media_id) response = await client.download(server_name, media_id)
if isinstance(response, nio.DownloadError): if isinstance(response, nio.DownloadError):

View File

@ -134,9 +134,8 @@ class MatrixClient(nio.AsyncClient):
async def resume(self, user_id: str, token: str, device_id: str) -> None: async def resume(self, user_id: str, token: str, device_id: str) -> None:
self.user_id = user_id response = nio.LoginResponse(user_id, device_id, token)
self.access_token = token await self.receive_response(response)
self.device_id = device_id
asyncio.ensure_future(self.start()) asyncio.ensure_future(self.start())
@ -152,6 +151,14 @@ class MatrixClient(nio.AsyncClient):
await self.close() await self.close()
@property
def syncing(self) -> bool:
if not self.sync_task:
return False
return not self.sync_task.done()
async def start(self) -> None: async def start(self) -> None:
def on_profile_response(future) -> None: def on_profile_response(future) -> None:
exception = future.exception() exception = future.exception()