Load past events when scrolling up
Also keep two nio clients internally: one for loop-syncing, one for the rest; since they can't handle more than one talk at a time.
This commit is contained in:
parent
9e5e2c6718
commit
a0f9acddaa
2
TODO.md
2
TODO.md
|
@ -17,8 +17,6 @@
|
||||||
- Use Loader? for MessageDelegate to show sub-components based on condition
|
- Use Loader? for MessageDelegate to show sub-components based on condition
|
||||||
- Better names and organization for the Message components
|
- Better names and organization for the Message components
|
||||||
|
|
||||||
- Load previous events on scroll up
|
|
||||||
|
|
||||||
- Migrate more JS functions to their own files
|
- Migrate more JS functions to their own files
|
||||||
|
|
||||||
- Accept room\_id arg for getUser
|
- Accept room\_id arg for getUser
|
||||||
|
|
|
@ -15,6 +15,8 @@ from .html_filter import HtmlFilter
|
||||||
class Backend(QObject):
|
class Backend(QObject):
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self.past_tokens: Dict[str, str] = {}
|
||||||
|
|
||||||
self._client_manager: ClientManager = ClientManager()
|
self._client_manager: ClientManager = ClientManager()
|
||||||
self._models: QMLModels = QMLModels()
|
self._models: QMLModels = QMLModels()
|
||||||
self._html_filter: HtmlFilter = HtmlFilter()
|
self._html_filter: HtmlFilter = HtmlFilter()
|
||||||
|
@ -56,3 +58,16 @@ class Backend(QObject):
|
||||||
# pylint:disable=no-self-use
|
# pylint:disable=no-self-use
|
||||||
md5 = hashlib.md5(bytes(string, "utf-8")).hexdigest()
|
md5 = hashlib.md5(bytes(string, "utf-8")).hexdigest()
|
||||||
return float("0.%s" % int(md5[-10:], 16))
|
return float("0.%s" % int(md5[-10:], 16))
|
||||||
|
|
||||||
|
|
||||||
|
@pyqtSlot(str)
|
||||||
|
def loadPastEvents(self, room_id: str) -> None:
|
||||||
|
if not room_id in self.past_tokens:
|
||||||
|
return # Initial sync not done yet
|
||||||
|
|
||||||
|
for client in self.clientManager.clients.values():
|
||||||
|
if room_id in client.nio.rooms:
|
||||||
|
client.loadPastEvents(room_id, self.past_tokens[room_id])
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Room not found in any client: {room_id}")
|
||||||
|
|
|
@ -14,6 +14,8 @@ from PyQt5.QtCore import QObject, pyqtProperty, pyqtSignal, pyqtSlot
|
||||||
import nio
|
import nio
|
||||||
import nio.responses as nr
|
import nio.responses as nr
|
||||||
|
|
||||||
|
from .network_manager import NetworkManager
|
||||||
|
|
||||||
# One pool per hostname/remote server;
|
# One pool per hostname/remote server;
|
||||||
# multiple Client for different accounts on the same server can exist.
|
# multiple Client for different accounts on the same server can exist.
|
||||||
_POOLS: DefaultDict[str, ThreadPoolExecutor] = \
|
_POOLS: DefaultDict[str, ThreadPoolExecutor] = \
|
||||||
|
@ -37,11 +39,13 @@ def futurize(func: Callable) -> Callable:
|
||||||
|
|
||||||
|
|
||||||
class Client(QObject):
|
class Client(QObject):
|
||||||
roomInvited = pyqtSignal(str)
|
roomInvited = pyqtSignal(str)
|
||||||
roomJoined = pyqtSignal(str)
|
roomJoined = pyqtSignal(str)
|
||||||
roomLeft = pyqtSignal(str)
|
roomLeft = pyqtSignal(str)
|
||||||
roomEventReceived = pyqtSignal(str, str, dict)
|
roomSyncPrevBatchTokenReceived = pyqtSignal(str, str)
|
||||||
roomTypingUsersUpdated = pyqtSignal(str, list)
|
roomPastPrevBatchTokenReceived = pyqtSignal(str, str)
|
||||||
|
roomEventReceived = pyqtSignal(str, str, dict)
|
||||||
|
roomTypingUsersUpdated = pyqtSignal(str, list)
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, hostname: str, username: str, device_id: str = ""
|
def __init__(self, hostname: str, username: str, device_id: str = ""
|
||||||
|
@ -52,13 +56,18 @@ class Client(QObject):
|
||||||
self.host: str = host
|
self.host: str = host
|
||||||
self.port: int = int(port[0]) if port else 443
|
self.port: int = int(port[0]) if port else 443
|
||||||
|
|
||||||
|
self.pool: ThreadPoolExecutor = _POOLS[self.host]
|
||||||
|
|
||||||
self.nio: nio.client.HttpClient = \
|
self.nio: nio.client.HttpClient = \
|
||||||
nio.client.HttpClient(self.host, username, device_id)
|
nio.client.HttpClient(self.host, username, device_id)
|
||||||
|
|
||||||
self.pool: ThreadPoolExecutor = _POOLS[self.host]
|
# Since nio clients can't handle more than one talk operation
|
||||||
|
# at a time, this one is used exclusively to poll the sync API
|
||||||
|
self.nio_sync: nio.client.HttpClient = \
|
||||||
|
nio.client.HttpClient(self.host, username, device_id)
|
||||||
|
|
||||||
from .network_manager import NetworkManager
|
self.net = NetworkManager(self.host, self.port, self.nio)
|
||||||
self.net: NetworkManager = NetworkManager(self)
|
self.net_sync = NetworkManager(self.host, self.port, self.nio_sync)
|
||||||
|
|
||||||
self._stop_sync: Event = Event()
|
self._stop_sync: Event = Event()
|
||||||
|
|
||||||
|
@ -78,7 +87,10 @@ class Client(QObject):
|
||||||
@futurize
|
@futurize
|
||||||
def login(self, password: str, device_name: str = "") -> None:
|
def login(self, password: str, device_name: str = "") -> None:
|
||||||
self.net.write(self.nio.connect())
|
self.net.write(self.nio.connect())
|
||||||
self.net.talk(self.nio.login, password, device_name)
|
response = self.net.talk(self.nio.login, password, device_name)
|
||||||
|
|
||||||
|
self.net_sync.write(self.nio_sync.connect())
|
||||||
|
self.nio_sync.receive_response(response)
|
||||||
self.startSyncing()
|
self.startSyncing()
|
||||||
|
|
||||||
|
|
||||||
|
@ -89,6 +101,9 @@ class Client(QObject):
|
||||||
self.net.write(self.nio.connect())
|
self.net.write(self.nio.connect())
|
||||||
response = nr.LoginResponse(user_id, device_id, token)
|
response = nr.LoginResponse(user_id, device_id, token)
|
||||||
self.nio.receive_response(response)
|
self.nio.receive_response(response)
|
||||||
|
|
||||||
|
self.net_sync.write(self.nio_sync.connect())
|
||||||
|
self.nio_sync.receive_response(response)
|
||||||
self.startSyncing()
|
self.startSyncing()
|
||||||
|
|
||||||
|
|
||||||
|
@ -97,13 +112,16 @@ class Client(QObject):
|
||||||
def logout(self) -> None:
|
def logout(self) -> None:
|
||||||
self._stop_sync.set()
|
self._stop_sync.set()
|
||||||
self.net.write(self.nio.disconnect())
|
self.net.write(self.nio.disconnect())
|
||||||
|
self.net_sync.write(self.nio_sync.disconnect())
|
||||||
|
|
||||||
|
|
||||||
@pyqtSlot()
|
@pyqtSlot()
|
||||||
@futurize
|
@futurize
|
||||||
def startSyncing(self) -> None:
|
def startSyncing(self) -> None:
|
||||||
while True:
|
while True:
|
||||||
self._on_sync(self.net.talk(self.nio.sync, timeout=10_000))
|
self._on_sync(self.net_sync.talk(
|
||||||
|
self.nio_sync.sync, timeout=10_000
|
||||||
|
))
|
||||||
|
|
||||||
if self._stop_sync.is_set():
|
if self._stop_sync.is_set():
|
||||||
self._stop_sync.clear()
|
self._stop_sync.clear()
|
||||||
|
@ -111,12 +129,18 @@ class Client(QObject):
|
||||||
|
|
||||||
|
|
||||||
def _on_sync(self, response: nr.SyncResponse) -> None:
|
def _on_sync(self, response: nr.SyncResponse) -> None:
|
||||||
|
self.nio.receive_response(response)
|
||||||
|
|
||||||
for room_id in response.rooms.invite:
|
for room_id in response.rooms.invite:
|
||||||
self.roomInvited.emit(room_id)
|
self.roomInvited.emit(room_id)
|
||||||
|
|
||||||
for room_id, room_info in response.rooms.join.items():
|
for room_id, room_info in response.rooms.join.items():
|
||||||
self.roomJoined.emit(room_id)
|
self.roomJoined.emit(room_id)
|
||||||
|
|
||||||
|
self.roomSyncPrevBatchTokenReceived.emit(
|
||||||
|
room_id, room_info.timeline.prev_batch
|
||||||
|
)
|
||||||
|
|
||||||
for ev in room_info.timeline.events:
|
for ev in room_info.timeline.events:
|
||||||
self.roomEventReceived.emit(
|
self.roomEventReceived.emit(
|
||||||
room_id, type(ev).__name__, ev.__dict__
|
room_id, type(ev).__name__, ev.__dict__
|
||||||
|
@ -130,3 +154,24 @@ class Client(QObject):
|
||||||
|
|
||||||
for room_id in response.rooms.leave:
|
for room_id in response.rooms.leave:
|
||||||
self.roomLeft.emit(room_id)
|
self.roomLeft.emit(room_id)
|
||||||
|
|
||||||
|
|
||||||
|
@futurize
|
||||||
|
def loadPastEvents(self, room_id: str, start_token: str) -> None:
|
||||||
|
# From QML, use Backend.loastPastEvents instead
|
||||||
|
self._on_past_events(
|
||||||
|
room_id,
|
||||||
|
self.net.talk(
|
||||||
|
self.nio.room_messages, room_id, start=start_token, limit=100
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _on_past_events(self, room_id: str, response: nr.RoomMessagesResponse
|
||||||
|
) -> None:
|
||||||
|
self.roomPastPrevBatchTokenReceived.emit(room_id, response.end)
|
||||||
|
|
||||||
|
for ev in response.chunk:
|
||||||
|
self.roomEventReceived.emit(
|
||||||
|
room_id, type(ev).__name__, ev.__dict__
|
||||||
|
)
|
||||||
|
|
|
@ -41,11 +41,11 @@ class HtmlFilter(QObject):
|
||||||
@pyqtSlot(str, result=str)
|
@pyqtSlot(str, result=str)
|
||||||
def filter(self, html: str) -> str:
|
def filter(self, html: str) -> str:
|
||||||
html = self._sanitizer.sanitize(html)
|
html = self._sanitizer.sanitize(html)
|
||||||
if not html:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
tree = etree.fromstring(html, parser=etree.HTMLParser())
|
tree = etree.fromstring(html, parser=etree.HTMLParser())
|
||||||
|
|
||||||
|
if tree is None:
|
||||||
|
return ""
|
||||||
|
|
||||||
for el in tree.iter("img"):
|
for el in tree.iter("img"):
|
||||||
el = self._wrap_img_in_a(el)
|
el = self._wrap_img_in_a(el)
|
||||||
|
|
||||||
|
|
|
@ -5,13 +5,13 @@ import logging
|
||||||
import socket
|
import socket
|
||||||
import ssl
|
import ssl
|
||||||
import time
|
import time
|
||||||
|
from threading import Lock
|
||||||
from typing import Callable, Optional, Tuple
|
from typing import Callable, Optional, Tuple
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
|
import nio
|
||||||
import nio.responses as nr
|
import nio.responses as nr
|
||||||
|
|
||||||
from .client import Client
|
|
||||||
|
|
||||||
OptSock = Optional[ssl.SSLSocket]
|
OptSock = Optional[ssl.SSLSocket]
|
||||||
NioRequestFunc = Callable[..., Tuple[UUID, bytes]]
|
NioRequestFunc = Callable[..., Tuple[UUID, bytes]]
|
||||||
|
|
||||||
|
@ -26,16 +26,21 @@ class NetworkManager:
|
||||||
http_retry_codes = {408, 429, 500, 502, 503, 504, 507}
|
http_retry_codes = {408, 429, 500, 502, 503, 504, 507}
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, client: Client) -> None:
|
def __init__(self, host: str, port: int, nio_client: nio.client.HttpClient
|
||||||
self.client = client
|
) -> None:
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.nio = nio_client
|
||||||
|
|
||||||
self._ssl_context: ssl.SSLContext = ssl.create_default_context()
|
self._ssl_context: ssl.SSLContext = ssl.create_default_context()
|
||||||
self._ssl_session: Optional[ssl.SSLSession] = None
|
self._ssl_session: Optional[ssl.SSLSession] = None
|
||||||
|
self._lock: Lock = Lock()
|
||||||
|
|
||||||
|
|
||||||
def _get_socket(self) -> ssl.SSLSocket:
|
def _get_socket(self) -> ssl.SSLSocket:
|
||||||
sock = self._ssl_context.wrap_socket( # type: ignore
|
sock = self._ssl_context.wrap_socket( # type: ignore
|
||||||
socket.create_connection((self.client.host, self.client.port)),
|
socket.create_connection((self.host, self.port)),
|
||||||
server_hostname = self.client.host,
|
server_hostname = self.host,
|
||||||
session = self._ssl_session,
|
session = self._ssl_session,
|
||||||
)
|
)
|
||||||
self._ssl_session = self._ssl_session or sock.session
|
self._ssl_session = self._ssl_session or sock.session
|
||||||
|
@ -53,8 +58,12 @@ class NetworkManager:
|
||||||
|
|
||||||
response = None
|
response = None
|
||||||
while not response:
|
while not response:
|
||||||
self.client.nio.receive(sock.recv(4096))
|
left_to_send = self.nio.data_to_send()
|
||||||
response = self.client.nio.next_response()
|
if left_to_send:
|
||||||
|
self.write(left_to_send, sock)
|
||||||
|
|
||||||
|
self.nio.receive(sock.recv(4096))
|
||||||
|
response = self.nio.next_response()
|
||||||
|
|
||||||
if isinstance(response, nr.ErrorResponse):
|
if isinstance(response, nr.ErrorResponse):
|
||||||
raise NioErrorResponse(response)
|
raise NioErrorResponse(response)
|
||||||
|
@ -66,6 +75,9 @@ class NetworkManager:
|
||||||
|
|
||||||
|
|
||||||
def write(self, data: bytes, with_sock: OptSock = None) -> None:
|
def write(self, data: bytes, with_sock: OptSock = None) -> None:
|
||||||
|
if not data:
|
||||||
|
return
|
||||||
|
|
||||||
sock = with_sock or self._get_socket()
|
sock = with_sock or self._get_socket()
|
||||||
sock.sendall(data)
|
sock.sendall(data)
|
||||||
|
|
||||||
|
@ -74,29 +86,30 @@ class NetworkManager:
|
||||||
|
|
||||||
|
|
||||||
def talk(self, nio_func: NioRequestFunc, *args, **kwargs) -> nr.Response:
|
def talk(self, nio_func: NioRequestFunc, *args, **kwargs) -> nr.Response:
|
||||||
while True:
|
with self._lock:
|
||||||
to_send = nio_func(*args, **kwargs)[1]
|
while True:
|
||||||
sock = self._get_socket()
|
to_send = nio_func(*args, **kwargs)[1]
|
||||||
|
sock = self._get_socket()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.write(to_send, sock)
|
self.write(to_send, sock)
|
||||||
response = self.read(sock)
|
response = self.read(sock)
|
||||||
|
|
||||||
except NioErrorResponse as err:
|
except NioErrorResponse as err:
|
||||||
logging.error("read bad response for %s: %s", nio_func, err)
|
logging.error("bad read for %s: %s", nio_func, err)
|
||||||
self._close_socket(sock)
|
self._close_socket(sock)
|
||||||
|
|
||||||
if self._should_abort_talk(err):
|
if self._should_abort_talk(err):
|
||||||
logging.error("aborting talk")
|
logging.error("aborting talk")
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
time.sleep(10)
|
self._close_socket(sock)
|
||||||
|
return response
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
self._close_socket(sock)
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
def _should_abort_talk(self, err: NioErrorResponse) -> bool:
|
def _should_abort_talk(self, err: NioErrorResponse) -> bool:
|
||||||
|
|
|
@ -81,6 +81,21 @@ class SignalManager(QObject):
|
||||||
del rooms[rooms.indexWhere("room_id", room_id)]
|
del rooms[rooms.indexWhere("room_id", room_id)]
|
||||||
|
|
||||||
|
|
||||||
|
def onRoomSyncPrevBatchTokenReceived(
|
||||||
|
self, _: Client, room_id: str, token: str
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
if room_id not in self.backend.past_tokens:
|
||||||
|
self.backend.past_tokens[room_id] = token
|
||||||
|
|
||||||
|
|
||||||
|
def onRoomPastPrevBatchTokenReceived(
|
||||||
|
self, _: Client, room_id: str, token: str
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
self.backend.past_tokens[room_id] = token
|
||||||
|
|
||||||
|
|
||||||
def onRoomEventReceived(
|
def onRoomEventReceived(
|
||||||
self, _: Client, room_id: str, etype: str, edict: Dict[str, Any]
|
self, _: Client, room_id: str, etype: str, edict: Dict[str, Any]
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
@ -26,14 +26,8 @@ Rectangle {
|
||||||
// reloaded from network.
|
// reloaded from network.
|
||||||
cacheBuffer: height * 6
|
cacheBuffer: height * 6
|
||||||
|
|
||||||
function goToEnd() {
|
onMovementEnded: if (atYBeginning) {
|
||||||
messageListView.positionViewAtEnd()
|
Backend.loadPastEvents(chatPage.room.room_id)
|
||||||
//messageListView.flick(0, -messageListView.bottomMargin * 100)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//Connections {
|
|
||||||
//target: messageListView.model
|
|
||||||
//onChanged: goToEnd()
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user