moment/harmonyqml/backend/backend.py
miruka d08f43e6be Add new queuing features to @futurize
- max_instances renamed to max_running

- consider_args parameter:
  if True, $max_running of this function with the same arguments can be
  running, else:
  $max_running of this function, no matter the arguments, can be running

- discard_if_max_running:
  if True and there are already the maximum possible number of running
  functions running, cancel this task (previous default behavior), else:
  Wait for a spot to be free before running
2019-05-08 10:32:45 -04:00

151 lines
4.6 KiB
Python

# Copyright 2019 miruka
# This file is part of harmonyqml, licensed under GPLv3.
import os
from concurrent.futures import ThreadPoolExecutor
from typing import Deque, Dict, Optional, Sequence, Set
from atomicfile import AtomicFile
from PyQt5.QtCore import QObject, QStandardPaths, pyqtProperty, pyqtSlot
from .html_filter import HtmlFilter
from .model import ListModel, ListModelMap
from .pyqt_future import futurize
class Backend(QObject):
def __init__(self, parent: QObject) -> None:
super().__init__(parent)
self.pool: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=6)
self._queried_displaynames: Dict[str, str] = {}
self.past_tokens: Dict[str, str] = {}
self.fully_loaded_rooms: Set[str] = set()
self._html_filter: HtmlFilter = HtmlFilter(self)
from .client_manager import ClientManager
self._client_manager: ClientManager = ClientManager(self)
self._accounts: ListModel = ListModel(parent=parent)
self._room_events: ListModelMap = ListModelMap(Deque, parent)
from .signal_manager import SignalManager
self._signal_manager: SignalManager = SignalManager(self)
self.clients.configLoad()
@pyqtProperty("QVariant", constant=True)
def htmlFilter(self):
return self._html_filter
@pyqtProperty("QVariant", constant=True)
def clients(self):
return self._client_manager
@pyqtProperty("QVariant", constant=True)
def accounts(self):
return self._accounts
@pyqtProperty("QVariant", constant=True)
def roomEvents(self):
return self._room_events
@pyqtProperty("QVariant", constant=True)
def signals(self):
return self._signal_manager
@pyqtSlot(str, result="QVariant")
@pyqtSlot(str, bool, result="QVariant")
@futurize(max_running=1, consider_args=True)
def getUserDisplayName(self, user_id: str, can_block: bool = True) -> str:
if user_id in self._queried_displaynames:
return self._queried_displaynames[user_id]
for client in self.clients.values():
for room in client.nio.rooms.values():
displayname = room.user_name(user_id)
if displayname:
return displayname
return self._query_user_displayname(user_id) if can_block else user_id
def _query_user_displayname(self, user_id: str) -> str:
client = next(iter(self.clients.values()))
response = client.net.talk(client.nio.get_displayname, user_id)
displayname = getattr(response, "displayname", "") or user_id
self._queried_displaynames[user_id] = displayname
return displayname
@pyqtSlot(str, result=float)
def hueFromString(self, string: str) -> float:
# pylint:disable=no-self-use
return sum((ord(char) * 99 for char in string)) % 360 / 360
@pyqtSlot(str)
@pyqtSlot(str, int)
def loadPastEvents(self, room_id: str, limit: int = 100) -> None:
if not room_id in self.past_tokens:
return # Initial sync not done yet
if room_id in self.fully_loaded_rooms:
return
for client in self.clients.values():
if room_id in client.nio.rooms:
client.loadPastEvents(
room_id, self.past_tokens[room_id], limit
)
break
@staticmethod
def getDir(standard_dir: QStandardPaths.StandardLocation) -> str:
path = QStandardPaths.writableLocation(standard_dir)
os.makedirs(path, exist_ok=True)
return path
def getFile(self,
standard_dir: QStandardPaths.StandardLocation,
relative_file_path: str,
initial_content: Optional[str] = None) -> str:
relative_file_path = relative_file_path.replace("/", os.sep)
path = QStandardPaths.locate(standard_dir, relative_file_path)
if path:
return path
path = os.path.join(self.getDir(standard_dir), relative_file_path)
if initial_content is not None:
with AtomicFile(path, "w") as new:
new.write(initial_content)
return path
@pyqtSlot()
@pyqtSlot(list)
def pdb(self, additional_data: Sequence = ()) -> None:
# pylint: disable=all
ad = additional_data
cl = self.clients
ac = self.accounts
re = self.roomEvents
tcl = lambda user: cl[f"@test_{user}:matrix.org"]
import pdb
from PyQt5.QtCore import pyqtRemoveInputHook
pyqtRemoveInputHook()
pdb.set_trace()