moment/harmonyqml/backend/client.py

342 lines
11 KiB
Python

# Copyright 2019 miruka
# This file is part of harmonyqml, licensed under GPLv3.
import logging as log
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Event
from typing import DefaultDict, Tuple
from PyQt5.QtCore import (
QObject, QStandardPaths, pyqtProperty, pyqtSignal, pyqtSlot
)
import nio
from .model.items import Trust
from .network_manager import NetworkManager
from .pyqt_future import PyQtFuture, futurize
class Client(QObject):
roomInvited = pyqtSignal([str, dict], [str])
roomJoined = pyqtSignal(str)
roomLeft = pyqtSignal([str, dict], [str])
roomAboutToBeForgotten = pyqtSignal(str)
roomSyncPrevBatchTokenReceived = pyqtSignal(str, str)
roomPastPrevBatchTokenReceived = pyqtSignal(str, str)
roomEventReceived = pyqtSignal(str, str, dict)
roomTypingUsersUpdated = pyqtSignal(str, list)
messageAboutToBeSent = pyqtSignal(str, dict)
deviceIsPresent = pyqtSignal(str, str, str)
deviceIsDeleted = pyqtSignal(str, str)
def __init__(self,
manager,
hostname: str,
username: str,
device_id: str = "") -> None:
super().__init__(manager)
self.manager = manager
host, *port = hostname.split(":")
self.host: str = host
self.port: int = int(port[0]) if port else 443
self.pool: ThreadPoolExecutor = ThreadPoolExecutor(6)
store_path = self.manager.backend.getDir(
QStandardPaths.AppDataLocation
)
self.nio: nio.client.HttpClient = nio.client.HttpClient(
self.host, username, device_id, store_path=store_path
)
# Since nio clients can't handle more than one talk operation
# at a time, this one is used exclusively to poll the sync API
self.nio_sync: nio.client.HttpClient = nio.client.HttpClient(
self.host, username, device_id, store_path=store_path
)
self.net = NetworkManager(self.host, self.port, self.nio)
self.net_sync = NetworkManager(self.host, self.port, self.nio_sync)
self._stop_sync: Event = Event()
# {room_id: (was_typing, at_timestamp_secs)}
self._last_typing_set: DefaultDict[str, Tuple[bool, float]] = \
DefaultDict(lambda: (False, 0))
def __repr__(self) -> str:
return "%s(host=%r, port=%r, user_id=%r)" % \
(type(self).__name__, self.host, self.port, self.userId)
@pyqtProperty(str, constant=True)
def userId(self) -> str:
return self.nio.user_id
@futurize(max_running=1, discard_if_max_running=True, pyqt=False)
def uploadE2EKeys(self) -> None:
self.net.talk(self.nio.keys_upload)
def queryE2EKeys(self) -> None:
print("query")
self._on_query_e2e_keys(self.net.talk(self.nio.keys_query))
def _on_query_e2e_keys(self, response: nio.KeysQueryResponse) -> None:
for user_id, device_dict in response.device_keys.items():
for device_id, payload in device_dict.items():
if device_id == self.nio.device_id:
continue
ed25519_key = payload["keys"][f"ed25519:{device_id}"]
self.deviceIsPresent.emit(user_id, device_id, ed25519_key)
for device_id, device in self.nio.device_store[user_id].items():
if device.deleted:
self.deviceIsDeleted.emit(user_id, device_id)
def claimE2EKeysForRoom(self, room_id: str) -> None:
self.net.talk(self.nio.keys_claim, room_id)
def shareRoomE2ESession(self,
room_id: str,
ignore_missing_sessions: bool = False) -> None:
self.net.talk(
self.nio.share_group_session,
room_id = room_id,
ignore_missing_sessions = ignore_missing_sessions,
)
def getDeviceTrust(self, device: nio.crypto.OlmDevice) -> Trust:
olm = self.nio.olm
return (
Trust.trusted if olm.is_device_verified(device) else
Trust.blacklisted if olm.is_device_blacklisted(device) else
Trust.undecided
)
@pyqtSlot(str, result="QVariant")
@pyqtSlot(str, str, result="QVariant")
@futurize()
def login(self, password: str, device_name: str = "") -> "Client":
# Main nio client will receive the response here
response = self.net.talk(self.nio.login, password, device_name)
# Now, receive it with the sync nio client too:
self.nio_sync.receive_response(response)
return self
@pyqtSlot(str, str, str, result="QVariant")
@futurize()
def resumeSession(self, user_id: str, token: str, device_id: str
) -> "Client":
response = nio.LoginResponse(user_id, device_id, token)
self.nio.receive_response(response)
self.nio_sync.receive_response(response)
return self
@pyqtSlot(result="QVariant")
@futurize()
def logout(self) -> "Client":
self._stop_sync.set()
self.net.http_disconnect()
self.net_sync.http_disconnect()
return self
@futurize(pyqt=False)
def startSyncing(self) -> None:
while True:
try:
response = self.net_sync.talk(self.nio_sync.sync, timeout=8000)
except nio.LocalProtocolError: # logout occured
pass
else:
self._on_sync(response)
if self._stop_sync.is_set():
self._stop_sync.clear()
break
def _on_sync(self, response: nio.SyncResponse) -> None:
self.nio.receive_response(response)
if self.nio.should_upload_keys:
self.uploadE2EKeys()
if self.nio.should_query_keys:
self.queryE2EKeys()
for room_id, room_info in response.rooms.invite.items():
for ev in room_info.invite_state:
member_ev = isinstance(ev, nio.InviteMemberEvent)
if member_ev and ev.content["membership"] == "join":
self.roomInvited.emit(room_id, ev.content)
break
else:
self.roomInvited[str].emit(room_id)
for room_id, room_info in response.rooms.join.items():
self.roomJoined.emit(room_id)
self.roomSyncPrevBatchTokenReceived.emit(
room_id, room_info.timeline.prev_batch
)
for ev in room_info.timeline.events:
self.roomEventReceived.emit(
room_id, type(ev).__name__, ev.__dict__
)
for ev in room_info.ephemeral:
if isinstance(ev, nio.TypingNoticeEvent):
self.roomTypingUsersUpdated.emit(room_id, ev.users)
else:
print("ephemeral event: ", ev)
for room_id, room_info in response.rooms.leave.items():
for ev in room_info.timeline.events:
member_ev = isinstance(ev, nio.RoomMemberEvent)
if member_ev and ev.content["membership"] in ("leave", "ban"):
self.roomLeft.emit(room_id, ev.__dict__)
break
else:
self.roomLeft[str].emit(room_id)
@futurize(max_running=1, discard_if_max_running=True)
def loadPastEvents(self, room_id: str, start_token: str, limit: int = 100
) -> None:
# From QML, use Backend.loastPastEvents instead
self._on_past_events(
room_id,
self.net.talk(
self.nio.room_messages, room_id, start=start_token, limit=limit
)
)
def _on_past_events(self, room_id: str, response: nio.RoomMessagesResponse
) -> None:
self.roomPastPrevBatchTokenReceived.emit(room_id, response.end)
for ev in response.chunk:
self.roomEventReceived.emit(
room_id, type(ev).__name__, ev.__dict__
)
@pyqtSlot(str, bool)
@futurize(max_running=1, discard_if_max_running=True)
def setTypingState(self, room_id: str, typing: bool) -> None:
set_for_secs = 5
last_set, last_time = self._last_typing_set[room_id]
if not typing and last_set is False:
return
if typing and time.time() - last_time < set_for_secs - 1:
return
self._last_typing_set[room_id] = (typing, time.time())
self.net.talk(
self.nio.room_typing,
room_id = room_id,
typing_state = typing,
timeout = set_for_secs * 1000,
)
@pyqtSlot(str, str)
def sendMarkdown(self, room_id: str, text: str) -> PyQtFuture:
html = self.manager.backend.htmlFilter.fromMarkdown(text)
content = {
"body": text,
"formatted_body": html,
"format": "org.matrix.custom.html",
"msgtype": "m.text",
}
self.messageAboutToBeSent.emit(room_id, content)
# If the thread pool workers are all occupied, and @futurize
# wrapped sendMarkdown, the messageAboutToBeSent signal neccessary
# for local echoes would not be sent until a thread is free.
#
# send() only takes the room_id argument explicitely because
# of consider_args=True: This means the max number of messages being
# sent at a time is one per room at a time.
@futurize(max_running=1, consider_args=True)
def send(self, room_id: str) -> PyQtFuture:
talk = lambda: self.net.talk(
self.nio.room_send,
room_id = room_id,
message_type = "m.room.message",
content = content,
)
try:
log.debug("Try sending message %r to %r", content, room_id)
return talk()
except nio.GroupEncryptionError as err:
log.warning(err)
try:
self.shareRoomE2ESession(room_id)
except nio.EncryptionError as err:
log.warning(err)
self.claimE2EKeysForRoom(room_id)
self.shareRoomE2ESession(room_id,
ignore_missing_sessions=True)
log.debug("Final try to send %r to %r", content, room_id)
return talk()
return send(self, room_id)
@pyqtSlot(str, result="QVariant")
@futurize()
def joinRoom(self, room_id: str) -> None:
return self.net.talk(self.nio.join, room_id=room_id)
@pyqtSlot(str, result="QVariant")
@futurize()
def leaveRoom(self, room_id: str) -> None:
return self.net.talk(self.nio.room_leave, room_id=room_id)
@pyqtSlot(str, result="QVariant")
@futurize()
def forgetRoom(self, room_id: str) -> None:
self.roomAboutToBeForgotten.emit(room_id)
response = self.net.talk(self.nio.room_forget, room_id=room_id)
self.nio.invalidate_outbound_session(room_id)
return response
@pyqtSlot(str, result=bool)
def roomHasUnknownDevices(self, room_id: str) -> bool:
return self.nio.room_contains_unverified(room_id)