diff --git a/src/backend/matrix_client.py b/src/backend/matrix_client.py index df25aa4c..98779ee6 100644 --- a/src/backend/matrix_client.py +++ b/src/backend/matrix_client.py @@ -75,6 +75,12 @@ REPLY_FALLBACK = ( ) +class SyncFilterIds(NamedTuple): + """Uploaded filter IDs for various API.""" + + first: str + others: str + class UploadReturn(NamedTuple): """Details for an uploaded file.""" @@ -185,10 +191,12 @@ class MatrixClient(nio.AsyncClient): self.upload_tasks: Dict[UUID, asyncio.Task] = {} self.send_message_tasks: Dict[UUID, asyncio.Task] = {} - self._presence: str = "" - self.first_sync_done: asyncio.Event = asyncio.Event() - self.first_sync_date: Optional[datetime] = None - self.last_sync_error: Optional[Exception] = None + self._presence: str = "" + self._sync_filter_ids: Optional[SyncFilterIds] = None + self._sync_filter_ids_lock: asyncio.Lock = asyncio.Lock() + self.first_sync_done: asyncio.Event = asyncio.Event() + self.first_sync_date: Optional[datetime] = None + self.last_sync_error: Optional[Exception] = None self.past_tokens: Dict[str, str] = {} # {room_id: token} self.fully_loaded_rooms: Set[str] = set() # {room_id} @@ -370,25 +378,17 @@ class MatrixClient(nio.AsyncClient): ) self.server_config_task.add_done_callback(on_server_config_response) - filter1 = deepcopy(self.lazy_load_filter) - utils.dict_update_recursive(filter1, self.low_limit_filter) - - cfg = self.backend.ui_settings - - if cfg["hideUnknownEvents"]: - filter1["room"]["timeline"]["not_types"].extend( - self.no_unknown_events_filter["room"]["timeline"]["not_types"], - ) - await self.auto_verify_all_other_accounts() while True: try: + sync_filter_ids = await self.sync_filter_ids() + self.sync_task = asyncio.ensure_future(self.sync_forever( timeout = 10_000, loop_sleep_time = 1000, - sync_filter = self.lazy_load_filter, - first_sync_filter = filter1, + first_sync_filter = sync_filter_ids.first, + sync_filter = sync_filter_ids.others, )) await self.sync_task break # task cancelled @@ -454,6 +454,35 @@ class MatrixClient(nio.AsyncClient): return (await self.content_repository_config()).upload_size + async def sync_filter_ids(self) -> SyncFilterIds: + """Return our sync/messages filter IDs, upload them if needed.""" + + async with self._sync_filter_ids_lock: + if self._sync_filter_ids: + return self._sync_filter_ids + + others = deepcopy(self.lazy_load_filter) + first = deepcopy(others) + + utils.dict_update_recursive(first, self.low_limit_filter) + + if self.backend.ui_settings["hideUnknownEvents"]: + first["room"]["timeline"]["not_types"].extend( + self.no_unknown_events_filter + ["room"]["timeline"]["not_types"], + ) + + others_id = (await self.upload_filter(**others)).filter_id + first_id = others_id + + if others != first: + resp = await self.upload_filter(**first) + first_id = resp.filter_id + + self._sync_filter_ids = SyncFilterIds(others_id, first_id) + return self._sync_filter_ids + + async def pause_while_offline(self) -> None: """Block until our account is online.""" while (