Rename some filers and folder for clarity
This commit is contained in:
6
src/backend/models/__init__.py
Normal file
6
src/backend/models/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from typing import Tuple, Type, Union
|
||||
|
||||
from .model_item import ModelItem
|
||||
|
||||
# last one: Tuple[Union[Type[ModelItem], Tuple[Type[ModelItem]]], str...]
|
||||
SyncId = Union[Type[ModelItem], Tuple[Type[ModelItem]], tuple]
|
238
src/backend/models/items.py
Normal file
238
src/backend/models/items.py
Normal file
@@ -0,0 +1,238 @@
|
||||
import asyncio
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type, Union
|
||||
from uuid import UUID
|
||||
|
||||
import lxml # nosec
|
||||
|
||||
import nio
|
||||
|
||||
from ..html_filter import HTML_FILTER
|
||||
from ..utils import AutoStrEnum, auto
|
||||
from .model_item import ModelItem
|
||||
|
||||
OptionalExceptionType = Union[Type[None], Type[Exception]]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Account(ModelItem):
|
||||
user_id: str = field()
|
||||
display_name: str = ""
|
||||
avatar_url: str = ""
|
||||
first_sync_done: bool = False
|
||||
profile_updated: Optional[datetime] = None
|
||||
|
||||
def __lt__(self, other: "Account") -> bool:
|
||||
name = self.display_name or self.user_id[1:]
|
||||
other_name = other.display_name or other.user_id[1:]
|
||||
return name < other_name
|
||||
|
||||
@property
|
||||
def filter_string(self) -> str:
|
||||
return self.display_name
|
||||
|
||||
|
||||
@dataclass
|
||||
class Room(ModelItem):
|
||||
room_id: str = field()
|
||||
given_name: str = ""
|
||||
display_name: str = ""
|
||||
avatar_url: str = ""
|
||||
plain_topic: str = ""
|
||||
topic: str = ""
|
||||
inviter_id: str = ""
|
||||
inviter_name: str = ""
|
||||
inviter_avatar: str = ""
|
||||
left: bool = False
|
||||
typing_members: List[str] = field(default_factory=list)
|
||||
|
||||
encrypted: bool = False
|
||||
invite_required: bool = True
|
||||
guests_allowed: bool = True
|
||||
|
||||
can_invite: bool = False
|
||||
can_send_messages: bool = False
|
||||
can_set_name: bool = False
|
||||
can_set_topic: bool = False
|
||||
can_set_avatar: bool = False
|
||||
can_set_encryption: bool = False
|
||||
can_set_join_rules: bool = False
|
||||
can_set_guest_access: bool = False
|
||||
|
||||
# Event.serialized
|
||||
last_event: Optional[Dict[str, Any]] = field(default=None, repr=False)
|
||||
|
||||
def __lt__(self, other: "Room") -> bool:
|
||||
# Order: Invited rooms > joined rooms > left rooms.
|
||||
# Within these categories, sort by date then by name.
|
||||
# Left rooms may still have an inviter_id, so check left first.
|
||||
return (
|
||||
self.left,
|
||||
|
||||
other.inviter_id,
|
||||
|
||||
other.last_event["date"] if other.last_event else
|
||||
datetime.fromtimestamp(0),
|
||||
|
||||
self.display_name.lower() or self.room_id,
|
||||
) < (
|
||||
other.left,
|
||||
|
||||
self.inviter_id,
|
||||
|
||||
self.last_event["date"] if self.last_event else
|
||||
datetime.fromtimestamp(0),
|
||||
|
||||
other.display_name.lower() or other.room_id,
|
||||
)
|
||||
|
||||
@property
|
||||
def filter_string(self) -> str:
|
||||
return " ".join((
|
||||
self.display_name,
|
||||
self.topic,
|
||||
re.sub(r"<.*?>", "", self.last_event["inline_content"])
|
||||
if self.last_event else "",
|
||||
))
|
||||
|
||||
|
||||
@dataclass
|
||||
class Member(ModelItem):
|
||||
user_id: str = field()
|
||||
display_name: str = ""
|
||||
avatar_url: str = ""
|
||||
typing: bool = False
|
||||
power_level: int = 0
|
||||
invited: bool = False
|
||||
|
||||
def __lt__(self, other: "Member") -> bool:
|
||||
# Sort by name, but have members with higher power-level first and
|
||||
# invited-but-not-joined members last
|
||||
name = (self.display_name or self.user_id[1:]).lower()
|
||||
other_name = (other.display_name or other.user_id[1:]).lower()
|
||||
|
||||
return (
|
||||
self.invited, other.power_level, name,
|
||||
) < (
|
||||
other.invited, self.power_level, other_name,
|
||||
)
|
||||
|
||||
|
||||
@property
|
||||
def filter_string(self) -> str:
|
||||
return self.display_name
|
||||
|
||||
|
||||
class UploadStatus(AutoStrEnum):
|
||||
Uploading = auto()
|
||||
Caching = auto()
|
||||
Error = auto()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Upload(ModelItem):
|
||||
uuid: UUID = field()
|
||||
task: asyncio.Task = field()
|
||||
monitor: nio.TransferMonitor = field()
|
||||
filepath: Path = field()
|
||||
|
||||
total_size: int = 0
|
||||
uploaded: int = 0
|
||||
speed: float = 0
|
||||
time_left: Optional[timedelta] = None
|
||||
|
||||
status: UploadStatus = UploadStatus.Uploading
|
||||
error: OptionalExceptionType = type(None)
|
||||
error_args: Tuple[Any, ...] = ()
|
||||
|
||||
start_date: datetime = field(init=False, default_factory=datetime.now)
|
||||
|
||||
|
||||
def __lt__(self, other: "Upload") -> bool:
|
||||
# Sort from newest upload to oldest.
|
||||
return self.start_date > other.start_date
|
||||
|
||||
|
||||
class TypeSpecifier(AutoStrEnum):
|
||||
none = auto()
|
||||
profile_change = auto()
|
||||
membership_change = auto()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Event(ModelItem):
|
||||
source: Optional[nio.Event] = field()
|
||||
client_id: str = field()
|
||||
event_id: str = field()
|
||||
date: datetime = field()
|
||||
sender_id: str = field()
|
||||
sender_name: str = field()
|
||||
sender_avatar: str = field()
|
||||
|
||||
content: str = ""
|
||||
inline_content: str = ""
|
||||
|
||||
type_specifier: TypeSpecifier = TypeSpecifier.none
|
||||
|
||||
target_id: str = ""
|
||||
target_name: str = ""
|
||||
target_avatar: str = ""
|
||||
|
||||
is_local_echo: bool = False
|
||||
local_event_type: Optional[Type[nio.Event]] = None
|
||||
|
||||
media_url: str = ""
|
||||
media_title: str = ""
|
||||
media_width: int = 0
|
||||
media_height: int = 0
|
||||
media_duration: int = 0
|
||||
media_size: int = 0
|
||||
media_mime: str = ""
|
||||
media_crypt_dict: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
thumbnail_url: str = ""
|
||||
thumbnail_width: int = 0
|
||||
thumbnail_height: int = 0
|
||||
thumbnail_crypt_dict: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.inline_content:
|
||||
self.inline_content = HTML_FILTER.filter_inline(self.content)
|
||||
|
||||
|
||||
def __lt__(self, other: "Event") -> bool:
|
||||
# Sort events from newest to oldest. return True means return False.
|
||||
return self.date > other.date
|
||||
|
||||
@property
|
||||
def event_type(self) -> Type:
|
||||
if self.local_event_type:
|
||||
return self.local_event_type
|
||||
|
||||
return type(self.source)
|
||||
|
||||
@property
|
||||
def links(self) -> List[str]:
|
||||
urls: List[str] = []
|
||||
|
||||
if self.content.strip():
|
||||
urls += [link[2] for link in lxml.html.iterlinks(self.content)]
|
||||
|
||||
if self.media_url:
|
||||
urls.append(self.media_url)
|
||||
|
||||
return urls
|
||||
|
||||
|
||||
@dataclass
|
||||
class Device(ModelItem):
|
||||
device_id: str = field()
|
||||
ed25519_key: str = field()
|
||||
trusted: bool = False
|
||||
blacklisted: bool = False
|
||||
display_name: str = ""
|
||||
last_seen_ip: str = ""
|
||||
last_seen_date: str = ""
|
111
src/backend/models/model.py
Normal file
111
src/backend/models/model.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import logging as log
|
||||
import time
|
||||
from threading import Lock, Thread
|
||||
from typing import Any, Dict, Iterator, List, MutableMapping
|
||||
|
||||
from . import SyncId
|
||||
from ..pyotherside_events import ModelUpdated
|
||||
from .model_item import ModelItem
|
||||
|
||||
|
||||
class Model(MutableMapping):
|
||||
def __init__(self, sync_id: SyncId) -> None:
|
||||
self.sync_id: SyncId = sync_id
|
||||
self._data: Dict[Any, ModelItem] = {}
|
||||
|
||||
self._changed: bool = False
|
||||
self._sync_lock: Lock = Lock()
|
||||
self._sync_thread: Thread = Thread(target=self._sync_loop, daemon=True)
|
||||
self._sync_thread.start()
|
||||
|
||||
|
||||
def __repr__(self) -> str:
|
||||
try:
|
||||
from pprintpp import pformat
|
||||
except ImportError:
|
||||
from pprint import pformat # type: ignore
|
||||
|
||||
if isinstance(self.sync_id, tuple):
|
||||
sid = (self.sync_id[0].__name__, *self.sync_id[1:])
|
||||
else:
|
||||
sid = self.sync_id.__name__ # type: ignore
|
||||
|
||||
return "%s(sync_id=%s, %s)" % (
|
||||
type(self).__name__, sid, pformat(self._data),
|
||||
)
|
||||
|
||||
|
||||
def __str__(self) -> str:
|
||||
if isinstance(self.sync_id, tuple):
|
||||
reprs = tuple(repr(s) for s in self.sync_id[1:])
|
||||
sid = ", ".join((self.sync_id[0].__name__, *reprs))
|
||||
sid = f"({sid})"
|
||||
else:
|
||||
sid = self.sync_id.__name__
|
||||
|
||||
return f"{sid!s}: {len(self)} items"
|
||||
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._data[key]
|
||||
|
||||
|
||||
def __setitem__(self, key, value: ModelItem) -> None:
|
||||
new = value
|
||||
|
||||
if key in self:
|
||||
existing = dict(self[key].serialized) # copy to not alter with pop
|
||||
merged = {**existing, **value.serialized}
|
||||
|
||||
existing.pop("parent_model", None)
|
||||
merged.pop("parent_model", None)
|
||||
|
||||
if merged == existing:
|
||||
return
|
||||
|
||||
merged_init_kwargs = {**vars(self[key]), **vars(value)}
|
||||
merged_init_kwargs.pop("parent_model", None)
|
||||
new = type(value)(**merged_init_kwargs)
|
||||
|
||||
new.parent_model = self
|
||||
|
||||
with self._sync_lock:
|
||||
self._data[key] = new
|
||||
self._changed = True
|
||||
|
||||
|
||||
def __delitem__(self, key) -> None:
|
||||
with self._sync_lock:
|
||||
del self._data[key]
|
||||
self._changed = True
|
||||
|
||||
|
||||
def __iter__(self) -> Iterator:
|
||||
return iter(self._data)
|
||||
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._data)
|
||||
|
||||
|
||||
def _sync_loop(self) -> None:
|
||||
while True:
|
||||
time.sleep(0.25)
|
||||
|
||||
if self._changed:
|
||||
with self._sync_lock:
|
||||
log.debug("Syncing %s", self)
|
||||
self.sync_now()
|
||||
|
||||
|
||||
def sync_now(self) -> None:
|
||||
ModelUpdated(self.sync_id, self.serialized())
|
||||
self._changed = False
|
||||
|
||||
|
||||
def serialized(self) -> List[Dict[str, Any]]:
|
||||
return [item.serialized for item in sorted(self._data.values())]
|
||||
|
||||
|
||||
def __lt__(self, other: "Model") -> bool:
|
||||
return str(self.sync_id) < str(other.sync_id)
|
33
src/backend/models/model_item.py
Normal file
33
src/backend/models/model_item.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from ..utils import serialize_value_for_qml
|
||||
|
||||
|
||||
class ModelItem:
|
||||
def __new__(cls, *_args, **_kwargs) -> "ModelItem":
|
||||
from .model import Model
|
||||
cls.parent_model: Optional[Model] = None
|
||||
return super().__new__(cls)
|
||||
|
||||
|
||||
def __setattr__(self, name: str, value) -> None:
|
||||
super().__setattr__(name, value)
|
||||
|
||||
if name != "parent_model" and self.parent_model is not None:
|
||||
with self.parent_model._sync_lock:
|
||||
self.parent_model._changed = True
|
||||
|
||||
|
||||
def __delattr__(self, name: str) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@property
|
||||
def serialized(self) -> Dict[str, Any]:
|
||||
return {
|
||||
name: serialize_value_for_qml(getattr(self, name))
|
||||
for name in dir(self)
|
||||
if not (
|
||||
name.startswith("_") or name in ("parent_model", "serialized")
|
||||
)
|
||||
}
|
61
src/backend/models/model_store.py
Normal file
61
src/backend/models/model_store.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from typing import Dict, Iterator, MutableMapping, Set, Tuple, Type, Union
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from . import SyncId
|
||||
from .model_item import ModelItem
|
||||
from .model import Model
|
||||
|
||||
KeyType = Union[Type[ModelItem], Tuple[Type, ...]]
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ModelStore(MutableMapping):
|
||||
allowed_key_types: Set[KeyType] = field()
|
||||
|
||||
data: Dict[SyncId, Model] = field(init=False, default_factory=dict)
|
||||
|
||||
|
||||
def __getitem__(self, key: SyncId) -> Model:
|
||||
try:
|
||||
return self.data[key]
|
||||
except KeyError:
|
||||
if isinstance(key, tuple):
|
||||
for i in key:
|
||||
if not i:
|
||||
raise ValueError(f"Empty string in key: {key!r}")
|
||||
|
||||
key_type = (key[0],) + \
|
||||
tuple(type(el) for el in key[1:])
|
||||
else:
|
||||
key_type = key # type: ignore
|
||||
|
||||
if key_type not in self.allowed_key_types:
|
||||
raise TypeError(f"{key_type!r} not in allowed key types: "
|
||||
f"{self.allowed_key_types!r}")
|
||||
|
||||
model = Model(key)
|
||||
self.data[key] = model
|
||||
return model
|
||||
|
||||
|
||||
def __setitem__(self, key, item) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def __delitem__(self, key: SyncId) -> None:
|
||||
del self.data[key]
|
||||
|
||||
|
||||
def __iter__(self) -> Iterator[SyncId]:
|
||||
return iter(self.data)
|
||||
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.data)
|
||||
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "%s(\n %s\n)" % (
|
||||
type(self).__name__,
|
||||
"\n ".join(sorted(str(v) for v in self.values())),
|
||||
)
|
Reference in New Issue
Block a user