Add new queuing features to @futurize

- max_instances renamed to max_running

- consider_args parameter:
  if True, $max_running of this function with the same arguments can be
  running, else:
  $max_running of this function, no matter the arguments, can be running

- discard_if_max_running:
  if True and there are already the maximum possible number of running
  functions running, cancel this task (previous default behavior), else:
  Wait for a spot to be free before running
This commit is contained in:
miruka 2019-05-08 10:22:52 -04:00
parent 0bd98a321d
commit d08f43e6be
4 changed files with 68 additions and 27 deletions

View File

@ -4,6 +4,7 @@
- Cleanup unused icons - Cleanup unused icons
- Bug fixes - Bug fixes
- Local echo messages all have the same time
- Something weird happening when nio store is created first time - Something weird happening when nio store is created first time
- 100% CPU usage when hitting top edge to trigger messages loading - 100% CPU usage when hitting top edge to trigger messages loading
- Sending `![A picture](https://picsum.photos/256/256)` → not clickable? - Sending `![A picture](https://picsum.photos/256/256)` → not clickable?
@ -38,6 +39,7 @@
- Status message and presence - Status message and presence
- Client improvements - Client improvements
- Don't send setTypingState False when focus lost if nothing in sendbox
- Initial sync filter and lazy load, see weechat-matrix `_handle_login()` - Initial sync filter and lazy load, see weechat-matrix `_handle_login()`
- See also `handle_response()`'s `keys_query` request - See also `handle_response()`'s `keys_query` request
- HTTP/2 - HTTP/2

View File

@ -60,7 +60,7 @@ class Backend(QObject):
@pyqtSlot(str, result="QVariant") @pyqtSlot(str, result="QVariant")
@pyqtSlot(str, bool, result="QVariant") @pyqtSlot(str, bool, result="QVariant")
@futurize() @futurize(max_running=1, consider_args=True)
def getUserDisplayName(self, user_id: str, can_block: bool = True) -> str: def getUserDisplayName(self, user_id: str, can_block: bool = True) -> str:
if user_id in self._queried_displaynames: if user_id in self._queried_displaynames:
return self._queried_displaynames[user_id] return self._queried_displaynames[user_id]

View File

@ -62,8 +62,6 @@ class Client(QObject):
self.net = NetworkManager(self.host, self.port, self.nio) self.net = NetworkManager(self.host, self.port, self.nio)
self.net_sync = NetworkManager(self.host, self.port, self.nio_sync) self.net_sync = NetworkManager(self.host, self.port, self.nio_sync)
self._loading: bool = False
self._stop_sync: Event = Event() self._stop_sync: Event = Event()
# {room_id: (was_typing, at_timestamp_secs)} # {room_id: (was_typing, at_timestamp_secs)}
@ -81,15 +79,13 @@ class Client(QObject):
return self.nio.user_id return self.nio.user_id
@futurize(pyqt=False) @futurize(max_running=1, discard_if_max_running=True, pyqt=False)
def _keys_upload(self) -> None: def _keys_upload(self) -> None:
print("uploading key")
self.net.talk(self.nio.keys_upload) self.net.talk(self.nio.keys_upload)
@futurize(max_instances=1, pyqt=False) @futurize(max_running=1, discard_if_max_running=True, pyqt=False)
def _keys_query(self) -> None: def _keys_query(self) -> None:
print("querying keys")
self.net.talk(self.nio.keys_query) self.net.talk(self.nio.keys_query)
@ -192,22 +188,17 @@ class Client(QObject):
self.roomLeft[str].emit(room_id) self.roomLeft[str].emit(room_id)
@futurize() @futurize(max_running=1, discard_if_max_running=True)
def loadPastEvents(self, room_id: str, start_token: str, limit: int = 100 def loadPastEvents(self, room_id: str, start_token: str, limit: int = 100
) -> None: ) -> None:
# From QML, use Backend.loastPastEvents instead # From QML, use Backend.loastPastEvents instead
if self._loading:
return
self._loading = True
self._on_past_events( self._on_past_events(
room_id, room_id,
self.net.talk( self.net.talk(
self.nio.room_messages, room_id, start=start_token, limit=limit self.nio.room_messages, room_id, start=start_token, limit=limit
) )
) )
self._loading = False
def _on_past_events(self, room_id: str, response: nio.RoomMessagesResponse def _on_past_events(self, room_id: str, response: nio.RoomMessagesResponse
@ -221,7 +212,7 @@ class Client(QObject):
@pyqtSlot(str, bool) @pyqtSlot(str, bool)
@futurize(max_instances=1) @futurize(max_running=1, discard_if_max_running=True)
def setTypingState(self, room_id: str, typing: bool) -> None: def setTypingState(self, room_id: str, typing: bool) -> None:
set_for_secs = 5 set_for_secs = 5
last_set, last_time = self._last_typing_set[room_id] last_set, last_time = self._last_typing_set[room_id]
@ -256,7 +247,7 @@ class Client(QObject):
# If the thread pool workers are all occupied, and @futurize # If the thread pool workers are all occupied, and @futurize
# wrapped sendMarkdown, the messageAboutToBeSent signal neccessary # wrapped sendMarkdown, the messageAboutToBeSent signal neccessary
# for local echoes would not be sent until a thread is free. # for local echoes would not be sent until a thread is free.
@futurize() @futurize(max_running=1)
def send(self): def send(self):
return self.net.talk( return self.net.talk(
self.nio.room_send, self.nio.room_send,

View File

@ -2,11 +2,12 @@
# This file is part of harmonyqml, licensed under GPLv3. # This file is part of harmonyqml, licensed under GPLv3.
import functools import functools
import logging import logging as log
import sys import sys
import time
import traceback import traceback
from concurrent.futures import Executor, Future from concurrent.futures import Executor, Future
from typing import Callable, List, Optional, Tuple, Union from typing import Callable, Deque, Optional, Tuple, Union
from PyQt5.QtCore import QObject, pyqtProperty, pyqtSignal, pyqtSlot from PyQt5.QtCore import QObject, pyqtProperty, pyqtSignal, pyqtSlot
@ -64,32 +65,79 @@ class PyQtFuture(QObject):
self.future.add_done_callback(fn) self.future.add_done_callback(fn)
_RUNNING: List[Tuple[Executor, Callable, tuple, dict]] = [] _Task = Tuple[Executor, Callable, Optional[tuple], Optional[dict]]
_RUNNING: Deque[_Task] = Deque()
_PENDING: Deque[_Task] = Deque()
def futurize(max_instances: Optional[int] = None, pyqt: bool = True def futurize(max_running: Optional[int] = None,
) -> Callable: consider_args: bool = False,
discard_if_max_running: bool = False,
pyqt: bool = True) -> Callable:
def decorator(func: Callable) -> Callable: def decorator(func: Callable) -> Callable:
@functools.wraps(func) @functools.wraps(func)
def wrapper(self, *args, **kws) -> Optional[PyQtFuture]: def wrapper(self, *args, **kws) -> Optional[PyQtFuture]:
task: _Task = (
self.pool,
func,
args if consider_args else None,
kws if consider_args else None,
)
def can_run_now() -> bool:
if max_running is not None and \
_RUNNING.count(task) >= max_running:
log.debug("!! Max %d tasks of this kind running: %r",
max_running, task[1:])
return False
if not consider_args or not _PENDING:
return True
log.debug(".. Pending: %r\n Queue: %r", task[1:], _PENDING)
candidate_task = next((
pending for pending in _PENDING
if pending[0] == self.pool and pending[1] == func
), None)
if candidate_task is None:
log.debug(">> No other candidate, starting: %r", task[1:])
return True
if candidate_task[2] == args and candidate_task[3] == kws:
log.debug(">> Candidate is us: %r", candidate_task[1:])
return True
log.debug("XX Other candidate: %r", candidate_task[1:])
return False
if not can_run_now() and discard_if_max_running:
log.debug("\\/ Discarding task: %r", task[1:])
return None
def run_and_catch_errs(): def run_and_catch_errs():
if not can_run_now():
log.debug("~~ Can't start now: %r", task[1:])
_PENDING.append(task)
while not can_run_now():
time.sleep(0.05)
_RUNNING.append(task)
log.debug("Starting: %r", task[1:])
# Without this, exceptions are silently ignored # Without this, exceptions are silently ignored
try: try:
return func(self, *args, **kws) return func(self, *args, **kws)
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
logging.error("Exiting thread/process due to exception.") log.error("Exiting thread/process due to exception.")
sys.exit(1) sys.exit(1)
finally: finally:
del _RUNNING[_RUNNING.index((self.pool, func, args, kws))] del _RUNNING[_RUNNING.index(task)]
if max_instances is not None and \
_RUNNING.count((self.pool, func, args, kws)) >= max_instances:
return None
_RUNNING.append((self.pool, func, args, kws))
future = self.pool.submit(run_and_catch_errs) future = self.pool.submit(run_and_catch_errs)
return PyQtFuture(future, self) if pyqt else future return PyQtFuture(future, self) if pyqt else future