moment/harmonyqml/backend/signal_manager.py

440 lines
16 KiB
Python

# Copyright 2019 miruka
# This file is part of harmonyqml, licensed under GPLv3.
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from typing import Any, Deque, Dict, List, Optional
from PyQt5.QtCore import QDateTime, QObject, pyqtBoundSignal, pyqtSignal
import nio
from nio.rooms import MatrixRoom
from .backend import Backend
from .client import Client
from .model.items import (
Account, Device, ListModel, Room, RoomCategory, RoomEvent, User
)
from .model.sort_filter_proxy import SortFilterProxy
from .pyqt_future import futurize
Inviter = Optional[Dict[str, str]]
LeftEvent = Optional[Dict[str, str]]
class SignalManager(QObject):
roomCategoryChanged = pyqtSignal(str, str, str, str)
_lock: Lock = Lock()
def __init__(self, backend: Backend) -> None:
super().__init__(parent=backend)
self.pool: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=6)
self.backend = backend
self.last_room_events: Deque[str] = Deque(maxlen=1000)
self._events_in_transfer: int = 0
self.backend.clients.clientAdded.connect(self.onClientAdded)
self.backend.clients.clientDeleted.connect(self.onClientDeleted)
def onClientAdded(self, client: Client) -> None:
if client.userId in self.backend.accounts:
return
# An user might already exist in the model, e.g. if another account
# was in a room with the account that we just connected to
self.backend.users.upsert(
where_main_key_is = client.userId,
update_with = User(
userId = client.userId,
displayName = self.backend.users[client.userId].displayName,
# Devices are added later, we might need to upload keys before
# but we want to show the accounts ASAP in the client side pane
devices = ListModel(),
)
)
# Backend.accounts
room_categories_kwargs: List[Dict[str, Any]] = [
{"name": "Invites", "rooms": ListModel()},
{"name": "Rooms", "rooms": ListModel()},
{"name": "Left", "rooms": ListModel()},
]
for i, _ in enumerate(room_categories_kwargs):
proxy = SortFilterProxy(
source_model = room_categories_kwargs[i]["rooms"],
sort_by_role = "lastEventDateTime",
filter_by_role = "displayName",
ascending = False,
)
room_categories_kwargs[i]["sortedRooms"] = proxy
self.backend.accounts.append(Account(
userId = client.userId,
roomCategories = ListModel([
RoomCategory(**kws) for kws in room_categories_kwargs
]),
))
# Upload our E2E keys to the matrix server if needed
if not client.nio.olm_account_shared:
client.uploadE2EKeys()
# Add all devices nio knows for this account
store = client.nio.device_store
for user_id in store.users:
user = self.backend.users.get(user_id, None)
if not user:
self.backend.users.append(
User(userId=user_id, devices=ListModel())
)
for device in store.active_user_devices(user_id):
self.backend.users[client.userId].devices.upsert(
where_main_key_is = device.id,
update_with = Device(
deviceId = device.id,
ed25519Key = device.ed25519,
trust = client.getDeviceTrust(device),
)
)
# Finally, connect all client signals
self.connectClient(client)
def onClientDeleted(self, user_id: str) -> None:
del self.backend.accounts[user_id]
def connectClient(self, client: Client) -> None:
for name in dir(client):
attr = getattr(client, name)
if isinstance(attr, pyqtBoundSignal):
def onSignal(*args, name=name) -> None:
func = getattr(self, f"on{name[0].upper()}{name[1:]}")
func(client, *args)
attr.connect(onSignal)
@staticmethod
def _get_room_displayname(nio_room: MatrixRoom) -> Optional[str]:
name = nio_room.name or nio_room.canonical_alias
if name:
return name
name = nio_room.group_name()
return None if name == "Empty room?" else name
def _add_users_from_nio_room(self, room: nio.rooms.MatrixRoom) -> None:
for user in room.users.values():
@futurize(running_value=user.display_name)
def get_displayname(self, user) -> str:
# pylint:disable=unused-argument
return user.display_name
self.backend.users.upsert(
where_main_key_is = user.user_id,
update_with = User(
userId = user.user_id,
displayName = get_displayname(self, user),
devices = ListModel()
),
ignore_roles = ("devices",),
)
def onRoomInvited(self,
client: Client,
room_id: str,
inviter: Inviter = None) -> None:
nio_room = client.nio.invited_rooms[room_id]
self._add_users_from_nio_room(nio_room)
categories = self.backend.accounts[client.userId].roomCategories
previous_room = categories["Rooms"].rooms.pop(room_id, None)
previous_left = categories["Left"].rooms.pop(room_id, None)
categories["Invites"].rooms.upsert(
where_main_key_is = room_id,
update_with = Room(
roomId = room_id,
displayName = self._get_room_displayname(nio_room),
topic = nio_room.topic,
inviter = inviter,
lastEventDateTime = QDateTime.currentDateTime(), # FIXME
members = list(nio_room.users.keys()),
),
ignore_roles = ("typingMembers"),
)
signal = self.roomCategoryChanged
if previous_room:
signal.emit(client.userId, room_id, "Rooms", "Invites")
elif previous_left:
signal.emit(client.userId, room_id, "Left", "Invites")
def onRoomJoined(self, client: Client, room_id: str) -> None:
nio_room = client.nio.rooms[room_id]
self._add_users_from_nio_room(nio_room)
categories = self.backend.accounts[client.userId].roomCategories
previous_invite = categories["Invites"].rooms.pop(room_id, None)
previous_left = categories["Left"].rooms.pop(room_id, None)
categories["Rooms"].rooms.upsert(
where_main_key_is = room_id,
update_with = Room(
roomId = room_id,
displayName = self._get_room_displayname(nio_room),
topic = nio_room.topic,
members = list(nio_room.users.keys()),
),
ignore_roles = ("typingMembers", "lastEventDateTime"),
)
signal = self.roomCategoryChanged
if previous_invite:
signal.emit(client.userId, room_id, "Invites", "Rooms")
elif previous_left:
signal.emit(client.userId, room_id, "Left", "Rooms")
def onRoomLeft(self,
client: Client,
room_id: str,
left_event: LeftEvent = None) -> None:
categories = self.backend.accounts[client.userId].roomCategories
previous_room = categories["Rooms"].rooms.pop(room_id, None)
previous_invite = categories["Invites"].rooms.pop(room_id, None)
previous = previous_room or previous_invite or \
categories["Left"].rooms.get(room_id, None)
left_time = left_event.get("server_timestamp") if left_event else None
categories["Left"].rooms.upsert(
where_main_key_is = room_id,
update_with = Room(
roomId = room_id,
displayName = previous.displayName if previous else None,
topic = previous.topic if previous else None,
leftEvent = left_event,
lastEventDateTime = (
QDateTime.fromMSecsSinceEpoch(left_time)
if left_time else QDateTime.currentDateTime()
),
),
ignore_roles = ("members", "lastEventDateTime"),
)
signal = self.roomCategoryChanged
if previous_room:
signal.emit(client.userId, room_id, "Rooms", "Left")
elif previous_invite:
signal.emit(client.userId, room_id, "Invites", "Left")
def onRoomSyncPrevBatchTokenReceived(self,
_: Client,
room_id: str,
token: str) -> None:
if room_id not in self.backend.past_tokens:
self.backend.past_tokens[room_id] = token
def onRoomPastPrevBatchTokenReceived(self,
_: Client,
room_id: str,
token: str) -> None:
if self.backend.past_tokens[room_id] == token:
self.backend.fully_loaded_rooms.add(room_id)
self.backend.past_tokens[room_id] = token
def _set_room_last_event(self, user_id: str, room_id: str, event: RoomEvent
) -> None:
for categ in self.backend.accounts[user_id].roomCategories:
if room_id in categ.rooms:
last = categ.rooms[room_id].lastEventDateTime
if last and last > event.dateTime:
continue
# Use setProperty to make sure to trigger model changed signals
categ.rooms.setProperty(
room_id, "lastEventDateTime", event.dateTime
)
def onRoomEventReceived(self,
client: Client,
room_id: str,
etype: str,
edict: Dict[str, Any]) -> None:
def process() -> Optional[RoomEvent]:
# Prevent duplicate events in models due to multiple accounts
if edict["event_id"] in self.last_room_events:
return None
self.last_room_events.appendleft(edict["event_id"])
model = self.backend.roomEvents[room_id]
date_time = QDateTime\
.fromMSecsSinceEpoch(edict["server_timestamp"])
new_event = RoomEvent(type=etype, dateTime=date_time, dict=edict)
event_is_our_profile_changed = (
etype == "RoomMemberEvent" and
edict.get("sender") in self.backend.clients and
((edict.get("content") or {}).get("membership") ==
(edict.get("prev_content") or {}).get("membership"))
)
if event_is_our_profile_changed:
return None
if etype == "RoomCreateEvent":
self.backend.fully_loaded_rooms.add(room_id)
if self._events_in_transfer:
local_echoes_met: int = 0
update_at: Optional[int] = None
# Find if any locally echoed event corresponds to new_event
for i, event in enumerate(model):
if not event.isLocalEcho:
continue
sb = (event.dict.get("sender"), event.dict.get("body"))
new_sb = (new_event.dict.get("sender"),
new_event.dict.get("body"))
if sb == new_sb:
# The oldest matching local echo shall be replaced
update_at = max(update_at or 0, i)
local_echoes_met += 1
if local_echoes_met >= self._events_in_transfer:
break
if update_at is not None:
model.update(update_at, new_event)
self._events_in_transfer -= 1
return new_event
for i, event in enumerate(model):
if event.isLocalEcho:
continue
# Model is sorted from newest to oldest message
if new_event.dateTime > event.dateTime:
model.insert(i, new_event)
return new_event
model.append(new_event)
return new_event
with self._lock:
new_event = process()
if new_event:
self._set_room_last_event(client.userId, room_id, new_event)
def onRoomTypingMembersUpdated(self,
client: Client,
room_id: str,
users: List[str]) -> None:
categories = self.backend.accounts[client.userId].roomCategories
for categ in categories:
try:
categ.rooms.setProperty(room_id, "typingMembers", users)
break
except ValueError:
pass
def onMessageAboutToBeSent(self,
client: Client,
room_id: str,
content: Dict[str, str]) -> None:
date_time = QDateTime.currentDateTime()
with self._lock:
model = self.backend.roomEvents[room_id]
nio_event = nio.events.RoomMessage.parse_event({
"event_id": "",
"sender": client.userId,
"origin_server_ts": date_time.toMSecsSinceEpoch(),
"content": content,
})
event = RoomEvent(
type = type(nio_event).__name__,
dict = nio_event.__dict__,
dateTime = date_time,
isLocalEcho = True,
)
model.insert(0, event)
self._events_in_transfer += 1
self._set_room_last_event(client.userId, room_id, event)
def onRoomAboutToBeForgotten(self, client: Client, room_id: str) -> None:
categories = self.backend.accounts[client.userId].roomCategories
for categ in categories:
categ.rooms.pop(room_id, None)
self.backend.roomEvents[room_id].clear()
def onDeviceIsPresent(self,
client: Client,
user_id: str,
device_id: str,
ed25519_key: str) -> None:
nio_device = client.nio.device_store[user_id][device_id]
user = self.backend.users.get(user_id, None)
if not user:
self.backend.users.append(
User(userId=user_id, devices=ListModel())
)
self.backend.users[user_id].devices.upsert(
where_main_key_is = device_id,
update_with = Device(
deviceId = device_id,
ed25519Key = ed25519_key,
trust = client.getDeviceTrust(nio_device),
)
)
def onDeviceIsDeleted(self, _: Client, user_id: str, device_id: str
) -> None:
try:
del self.backend.users[user_id].devices[device_id]
except ValueError:
pass