Improve upload code, based on nio PR #76
This commit is contained in:
parent
25ce08891d
commit
2123f5a66f
23
TESTS.md
23
TESTS.md
|
@ -1,10 +1,10 @@
|
|||
# Manual GUI tests
|
||||
|
||||
## Sidepane
|
||||
|
||||
All the following statements must be true.
|
||||
|
||||
|
||||
## Sidepane
|
||||
|
||||
For all tests:
|
||||
|
||||
- When the pane collapses, the overall spacing/margins/paddings is 0.
|
||||
|
@ -52,3 +52,22 @@ Shrink the window enough for the pane to be in reduced mode:
|
|||
- In a page or room, a left-to-right swipe gesture shows a full-window pane.
|
||||
- On the full-window pane, a right-to-left swipe goes back to the page/room.
|
||||
- On the full-window pane, tapping on a room/page properly goes to it.
|
||||
|
||||
|
||||
## Uploads
|
||||
|
||||
Verify uploads work:
|
||||
|
||||
- In non-encrypted rooms
|
||||
- In encrypted rooms
|
||||
|
||||
Verify these file types are uploaded and rendered correctly;
|
||||
and that the correct URL is opened in browser when clicking on them in a chat:
|
||||
|
||||
- Image that doesn't need thumbnailing
|
||||
- Non-SVG, non-animated image that meets the conditions for thumbnailing
|
||||
- Animated GIF that meets the condition for thumbnailing
|
||||
- Binary or text file
|
||||
|
||||
Verify uploaded media and their thumbnails are correctly cached on upload
|
||||
and read from cache.
|
||||
|
|
|
@ -9,5 +9,6 @@ html_sanitizer >= 1.7.3, < 2
|
|||
lxml >= 4.4.1, < 5
|
||||
mistune >= 0.8.4, < 0.9
|
||||
dataclasses >= 0.6, < 0.7; python_version < "3.7"
|
||||
pyfastcopy >= 1.0.3, < 2; python_version < "3.8"
|
||||
|
||||
git+http://github.com/mirukan/matrix-nio#egg=matrix-nio[e2e]
|
||||
|
|
|
@ -63,11 +63,6 @@ class UneededThumbnail(Exception):
|
|||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class UnthumbnailableError(Exception):
|
||||
exception: Optional[Exception] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class BadMimeType(Exception):
|
||||
wanted: str = field()
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import asyncio
|
||||
import functools
|
||||
import html
|
||||
import io
|
||||
import logging as log
|
||||
|
@ -11,8 +10,7 @@ from datetime import datetime
|
|||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any, AsyncIterable, BinaryIO, DefaultDict, Dict, Optional, Set, Tuple,
|
||||
Type, Union,
|
||||
Any, DefaultDict, Dict, NamedTuple, Optional, Set, Tuple, Type, Union,
|
||||
)
|
||||
from urllib.parse import urlparse
|
||||
from uuid import uuid4
|
||||
|
@ -22,11 +20,13 @@ from PIL import Image as PILImage
|
|||
from pymediainfo import MediaInfo
|
||||
|
||||
import nio
|
||||
from nio.crypto import AsyncDataT as UploadData
|
||||
from nio.crypto import async_generator_from_data
|
||||
|
||||
from . import __about__, utils
|
||||
from .errors import (
|
||||
BadMimeType, InvalidUserInContext, MatrixError, UneededThumbnail,
|
||||
UnthumbnailableError, UserNotFound,
|
||||
UserNotFound,
|
||||
)
|
||||
from .html_filter import HTML_FILTER
|
||||
from .models.items import (
|
||||
|
@ -35,8 +35,20 @@ from .models.items import (
|
|||
from .models.model_store import ModelStore
|
||||
from .pyotherside_events import AlertRequested
|
||||
|
||||
UploadData = Union[bytes, BinaryIO, AsyncIterable[bytes]]
|
||||
CryptDict = Dict[str, Any]
|
||||
CryptDict = Dict[str, Any]
|
||||
|
||||
|
||||
class UploadReturn(NamedTuple):
|
||||
mxc: str
|
||||
mime: str
|
||||
decryption_dict: Dict[str, Any]
|
||||
|
||||
|
||||
class MatrixImageInfo(NamedTuple):
|
||||
w: int
|
||||
h: int
|
||||
mimetype: str
|
||||
size: int
|
||||
|
||||
|
||||
class MatrixClient(nio.AsyncClient):
|
||||
|
@ -191,6 +203,8 @@ class MatrixClient(nio.AsyncClient):
|
|||
|
||||
|
||||
async def send_file(self, room_id: str, path: Union[Path, str]) -> None:
|
||||
from .media_cache import Media, Thumbnail
|
||||
|
||||
path = Path(path)
|
||||
size = path.resolve().stat().st_size
|
||||
encrypt = room_id in self.encrypted_rooms
|
||||
|
@ -198,16 +212,17 @@ class MatrixClient(nio.AsyncClient):
|
|||
upload_item = Upload(path, total_size=size)
|
||||
self.models[Upload, room_id][upload_item.uuid] = upload_item
|
||||
|
||||
url, mime, crypt_dict = await self.upload_file(
|
||||
path, upload_item, encrypt=encrypt,
|
||||
url, mime, crypt_dict = await self.upload(
|
||||
path, filename=path.name, encrypt=encrypt,
|
||||
)
|
||||
|
||||
await self.media_cache.create_media(url, path.read_bytes())
|
||||
upload_item.status = UploadStatus.Caching
|
||||
await Media.from_existing_file(self.backend.media_cache, url, path)
|
||||
|
||||
kind = (mime or "").split("/")[0]
|
||||
|
||||
thumb_url: str = ""
|
||||
thumb_info: Dict[str, Any] = {}
|
||||
thumb_url: str = ""
|
||||
thumb_info: Optional[MatrixImageInfo] = None
|
||||
|
||||
content: dict = {
|
||||
"body": path.name,
|
||||
|
@ -231,23 +246,34 @@ class MatrixClient(nio.AsyncClient):
|
|||
content["msgtype"] = "m.image"
|
||||
|
||||
content["info"]["w"], content["info"]["h"] = (
|
||||
utils.svg_dimensions(path) if is_svg else
|
||||
await utils.svg_dimensions(path) if is_svg else
|
||||
PILImage.open(path).size
|
||||
)
|
||||
|
||||
try:
|
||||
thumb_data, thumb_url, thumb_info, thumb_crypt_dict = \
|
||||
await self.upload_thumbnail(
|
||||
path, upload_item, is_svg=is_svg, encrypt=encrypt,
|
||||
)
|
||||
except (UneededThumbnail, UnthumbnailableError):
|
||||
thumb_data, thumb_info = await self.generate_thumbnail(
|
||||
path, is_svg=is_svg,
|
||||
)
|
||||
except UneededThumbnail:
|
||||
pass
|
||||
except OSError as err:
|
||||
log.warning(f"Failed thumbnailing {path}: {err}")
|
||||
else:
|
||||
await self.media_cache.create_thumbnail(
|
||||
upload_item.status = UploadStatus.UploadingThumbnail
|
||||
|
||||
thumb_url, _, thumb_crypt_dict = await self.upload(
|
||||
thumb_data,
|
||||
filename = f"{path.stem}_sample{''.join(path.suffixes)}",
|
||||
encrypt = encrypt,
|
||||
)
|
||||
|
||||
upload_item.status = UploadStatus.CachingThumbnail
|
||||
|
||||
await Thumbnail.from_bytes(
|
||||
self.backend.media_cache,
|
||||
thumb_url,
|
||||
thumb_data,
|
||||
content["info"]["w"],
|
||||
content["info"]["h"],
|
||||
wanted_size = (content["info"]["w"], content["info"]["h"]),
|
||||
)
|
||||
|
||||
if encrypt:
|
||||
|
@ -258,7 +284,7 @@ class MatrixClient(nio.AsyncClient):
|
|||
else:
|
||||
content["info"]["thumbnail_url"] = thumb_url
|
||||
|
||||
content["info"]["thumbnail_info"] = thumb_info
|
||||
content["info"]["thumbnail_info"] = thumb_info._asdict()
|
||||
|
||||
elif kind == "audio":
|
||||
event_type = \
|
||||
|
@ -309,8 +335,10 @@ class MatrixClient(nio.AsyncClient):
|
|||
media_size = content["info"]["size"],
|
||||
media_mime = content["info"]["mimetype"],
|
||||
thumbnail_url = thumb_url,
|
||||
thumbnail_width = thumb_info.get("w", 0),
|
||||
thumbnail_height = thumb_info.get("h", 0),
|
||||
thumbnail_width =
|
||||
content["info"].get("thumbnail_info", {}).get("w", 0),
|
||||
thumbnail_height =
|
||||
content["info"].get("thumbnail_info", {}).get("h", 0),
|
||||
)
|
||||
|
||||
await self._send_message(room_id, uuid, content)
|
||||
|
@ -483,7 +511,6 @@ class MatrixClient(nio.AsyncClient):
|
|||
return response.room_id
|
||||
|
||||
|
||||
|
||||
async def room_forget(self, room_id: str) -> None:
|
||||
await super().room_leave(room_id)
|
||||
await super().room_forget(room_id)
|
||||
|
@ -492,141 +519,81 @@ class MatrixClient(nio.AsyncClient):
|
|||
self.models.pop((Member, room_id), None)
|
||||
|
||||
|
||||
async def encrypt_attachment(self, data: bytes) -> Tuple[bytes, CryptDict]:
|
||||
func = functools.partial(
|
||||
nio.crypto.attachments.encrypt_attachment,
|
||||
data,
|
||||
)
|
||||
|
||||
# Run in a separate thread
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
|
||||
async def upload_thumbnail(
|
||||
self,
|
||||
path: Union[Path, str],
|
||||
item: Optional[Upload] = None,
|
||||
is_svg: bool = False,
|
||||
encrypt: bool = False,
|
||||
) -> Tuple[bytes, str, Dict[str, Any], CryptDict]:
|
||||
async def generate_thumbnail(
|
||||
self, data: UploadData, is_svg: bool = False,
|
||||
) -> Tuple[bytes, MatrixImageInfo]:
|
||||
|
||||
png_modes = ("1", "L", "P", "RGBA")
|
||||
|
||||
try:
|
||||
if is_svg:
|
||||
svg_width, svg_height = utils.svg_dimensions(path)
|
||||
data = b"".join([c async for c in async_generator_from_data(data)])
|
||||
is_svg = await utils.guess_mime(data) == "image/svg+xml"
|
||||
|
||||
thumb = PILImage.open(io.BytesIO(
|
||||
cairosvg.svg2png(
|
||||
url = str(path),
|
||||
parent_width = svg_width,
|
||||
parent_height = svg_height,
|
||||
),
|
||||
))
|
||||
else:
|
||||
thumb = PILImage.open(path)
|
||||
if is_svg:
|
||||
svg_width, svg_height = await utils.svg_dimensions(data)
|
||||
|
||||
small = thumb.width <= 800 and thumb.height <= 600
|
||||
is_jpg_png = thumb.format in ("JPEG", "PNG")
|
||||
jpgable_png = thumb.format == "PNG" and thumb.mode not in png_modes
|
||||
|
||||
if small and is_jpg_png and not jpgable_png and not is_svg:
|
||||
raise UneededThumbnail()
|
||||
|
||||
if item:
|
||||
item.status = UploadStatus.CreatingThumbnail
|
||||
|
||||
if not small:
|
||||
thumb.thumbnail((800, 600), PILImage.LANCZOS)
|
||||
|
||||
with io.BytesIO() as out:
|
||||
if thumb.mode in png_modes:
|
||||
thumb.save(out, "PNG", optimize=True)
|
||||
mime = "image/png"
|
||||
else:
|
||||
thumb.convert("RGB").save(out, "JPEG", optimize=True)
|
||||
mime = "image/jpeg"
|
||||
|
||||
data = out.getvalue()
|
||||
|
||||
if encrypt:
|
||||
if item:
|
||||
item.status = UploadStatus.EncryptingThumbnail
|
||||
|
||||
data, crypt_dict = await self.encrypt_attachment(data)
|
||||
upload_mime = "application/octet-stream"
|
||||
else:
|
||||
crypt_dict, upload_mime = {}, mime
|
||||
|
||||
if item:
|
||||
item.status = UploadStatus.UploadingThumbnail
|
||||
|
||||
return (
|
||||
data,
|
||||
await self.upload(data, upload_mime, Path(path).name),
|
||||
{
|
||||
"w": thumb.width,
|
||||
"h": thumb.height,
|
||||
"mimetype": mime,
|
||||
"size": len(data),
|
||||
},
|
||||
crypt_dict,
|
||||
)
|
||||
|
||||
except OSError as err:
|
||||
log.warning("Error when creating thumbnail: %s", err)
|
||||
raise UnthumbnailableError(err)
|
||||
|
||||
|
||||
async def upload_file(
|
||||
self,
|
||||
path: Union[Path, str],
|
||||
item: Optional[Upload] = None,
|
||||
encrypt: bool = False,
|
||||
) -> Tuple[str, str, CryptDict]:
|
||||
|
||||
with open(path, "rb") as file:
|
||||
mime = utils.guess_mime(file)
|
||||
|
||||
data: Union[BinaryIO, bytes]
|
||||
|
||||
if encrypt:
|
||||
if item:
|
||||
item.status = UploadStatus.Encrypting
|
||||
|
||||
data, crypt_dict = await self.encrypt_attachment(file.read())
|
||||
upload_mime = "application/octet-stream"
|
||||
else:
|
||||
data, crypt_dict, upload_mime = file, {}, mime
|
||||
|
||||
if item:
|
||||
item.status = UploadStatus.Uploading
|
||||
|
||||
return (
|
||||
await self.upload(data, upload_mime, Path(path).name),
|
||||
mime,
|
||||
crypt_dict,
|
||||
data = cairosvg.svg2png(
|
||||
bytestring = data,
|
||||
parent_width = svg_width,
|
||||
parent_height = svg_height,
|
||||
)
|
||||
|
||||
thumb = PILImage.open(io.BytesIO(data))
|
||||
|
||||
small = thumb.width <= 800 and thumb.height <= 600
|
||||
is_jpg_png = thumb.format in ("JPEG", "PNG")
|
||||
jpgable_png = thumb.format == "PNG" and thumb.mode not in png_modes
|
||||
|
||||
if small and is_jpg_png and not jpgable_png and not is_svg:
|
||||
raise UneededThumbnail()
|
||||
|
||||
if not small:
|
||||
thumb.thumbnail((800, 600), PILImage.LANCZOS)
|
||||
|
||||
with io.BytesIO() as out:
|
||||
if thumb.mode in png_modes:
|
||||
thumb.save(out, "PNG", optimize=True)
|
||||
mime = "image/png"
|
||||
else:
|
||||
thumb.convert("RGB").save(out, "JPEG", optimize=True)
|
||||
mime = "image/jpeg"
|
||||
|
||||
data = out.getvalue()
|
||||
|
||||
info = MatrixImageInfo(thumb.width, thumb.height, mime, len(data))
|
||||
return (data, info)
|
||||
|
||||
|
||||
async def upload(
|
||||
self, data: UploadData, mime: str, filename: Optional[str] = None,
|
||||
) -> str:
|
||||
response = await super().upload(data, mime, filename)
|
||||
self,
|
||||
data: UploadData,
|
||||
mime: Optional[str] = None,
|
||||
filename: Optional[str] = None,
|
||||
encrypt: bool = False,
|
||||
) -> UploadReturn:
|
||||
|
||||
mime = mime or await utils.guess_mime(data)
|
||||
|
||||
response, decryption_dict = await super().upload(
|
||||
data,
|
||||
"application/octet-stream" if encrypt else mime,
|
||||
filename,
|
||||
encrypt,
|
||||
)
|
||||
|
||||
if isinstance(response, nio.UploadError):
|
||||
raise MatrixError.from_nio(response)
|
||||
|
||||
return response.content_uri
|
||||
return UploadReturn(response.content_uri, mime, decryption_dict)
|
||||
|
||||
|
||||
async def set_avatar_from_file(self, path: Union[Path, str]) -> None:
|
||||
mime = utils.guess_mime(path)
|
||||
mime = await utils.guess_mime(path)
|
||||
|
||||
if mime.split("/")[0] != "image":
|
||||
raise BadMimeType(wanted="image/*", got=mime)
|
||||
|
||||
await self.set_avatar((await self.upload_file(path))[0])
|
||||
mxc, *_ = await self.upload_file(path, mime, Path(path).name)
|
||||
await self.set_avatar(mxc)
|
||||
|
||||
|
||||
async def import_keys(self, infile: str, passphrase: str) -> None:
|
||||
|
|
|
@ -2,6 +2,8 @@ import asyncio
|
|||
import functools
|
||||
import io
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, DefaultDict, Dict, Optional
|
||||
|
@ -15,6 +17,9 @@ import nio
|
|||
from .backend import Backend
|
||||
from .utils import Size
|
||||
|
||||
if sys.version_info < (3, 8):
|
||||
import pyfastcopy # noqa
|
||||
|
||||
CryptDict = Optional[Dict[str, Any]]
|
||||
|
||||
CONCURRENT_DOWNLOADS_LIMIT = asyncio.BoundedSemaphore(8)
|
||||
|
@ -23,10 +28,9 @@ ACCESS_LOCKS: DefaultDict[str, asyncio.Lock] = DefaultDict(asyncio.Lock)
|
|||
|
||||
@dataclass
|
||||
class Media:
|
||||
cache: "MediaCache" = field()
|
||||
mxc: str = field()
|
||||
data: Optional[bytes] = field(repr=False)
|
||||
crypt_dict: CryptDict = field(repr=False)
|
||||
cache: "MediaCache" = field()
|
||||
mxc: str = field()
|
||||
crypt_dict: CryptDict = field(repr=False)
|
||||
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
|
@ -64,14 +68,13 @@ class Media:
|
|||
|
||||
|
||||
async def create(self) -> Path:
|
||||
if self.data is None:
|
||||
async with CONCURRENT_DOWNLOADS_LIMIT:
|
||||
self.data = await self._get_remote_data()
|
||||
async with CONCURRENT_DOWNLOADS_LIMIT:
|
||||
data = await self._get_remote_data()
|
||||
|
||||
self.local_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
async with aiofiles.open(self.local_path, "wb") as file:
|
||||
await file.write(self.data)
|
||||
await file.write(data)
|
||||
|
||||
return self.local_path
|
||||
|
||||
|
@ -103,13 +106,52 @@ class Media:
|
|||
return await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
|
||||
@classmethod
|
||||
async def from_existing_file(
|
||||
cls,
|
||||
cache: "MediaCache",
|
||||
mxc: str,
|
||||
existing: Path,
|
||||
overwrite: bool = False,
|
||||
**kwargs,
|
||||
) -> "Media":
|
||||
|
||||
media = cls(cache, mxc, {}, **kwargs) # type: ignore
|
||||
media.local_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not media.local_path.exists() or overwrite:
|
||||
func = functools.partial(shutil.copy, existing, media.local_path)
|
||||
await asyncio.get_event_loop().run_in_executor(None, func)
|
||||
|
||||
return media
|
||||
|
||||
|
||||
@classmethod
|
||||
async def from_bytes(
|
||||
cls,
|
||||
cache: "MediaCache",
|
||||
mxc: str,
|
||||
data: bytes,
|
||||
overwrite: bool = False,
|
||||
**kwargs,
|
||||
) -> "Media":
|
||||
|
||||
media = cls(cache, mxc, {}, **kwargs) # type: ignore
|
||||
media.local_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not media.local_path.exists() or overwrite:
|
||||
async with aiofiles.open(media.local_path, "wb") as file:
|
||||
await file.write(data)
|
||||
|
||||
return media
|
||||
|
||||
|
||||
@dataclass
|
||||
class Thumbnail(Media):
|
||||
cache: "MediaCache" = field()
|
||||
mxc: str = field()
|
||||
data: Optional[bytes] = field(repr=False)
|
||||
crypt_dict: CryptDict = field(repr=False)
|
||||
wanted_size: Size = field()
|
||||
cache: "MediaCache" = field()
|
||||
mxc: str = field()
|
||||
crypt_dict: CryptDict = field(repr=False)
|
||||
wanted_size: Size = field()
|
||||
|
||||
server_size: Optional[Size] = field(init=False, repr=False, default=None)
|
||||
|
||||
|
@ -207,19 +249,10 @@ class MediaCache:
|
|||
self.downloads_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
async def create_media(self, mxc: str, data: bytes) -> None:
|
||||
await Media(self, mxc, data, {}).create()
|
||||
|
||||
# These methods are for conveniant usage from QML
|
||||
|
||||
async def get_media(self, mxc: str, crypt_dict: CryptDict = None) -> Path:
|
||||
return await Media(self, mxc, None, crypt_dict).get()
|
||||
|
||||
|
||||
async def create_thumbnail(
|
||||
self, mxc: str, data: bytes, width: int, height: int,
|
||||
) -> None:
|
||||
thumb = Thumbnail(self, mxc, data, {}, (round(width), round(height)))
|
||||
await thumb.create()
|
||||
return await Media(self, mxc, crypt_dict).get()
|
||||
|
||||
|
||||
async def get_thumbnail(
|
||||
|
@ -228,6 +261,6 @@ class MediaCache:
|
|||
|
||||
thumb = Thumbnail(
|
||||
# QML sometimes pass float sizes, which matrix API doesn't like.
|
||||
self, mxc, None, crypt_dict, (round(width), round(height)),
|
||||
self, mxc, crypt_dict, (round(width), round(height)),
|
||||
)
|
||||
return await thumb.get()
|
||||
|
|
|
@ -104,19 +104,17 @@ class Member(ModelItem):
|
|||
|
||||
|
||||
class UploadStatus(AutoStrEnum):
|
||||
Starting = auto()
|
||||
Encrypting = auto()
|
||||
Uploading = auto()
|
||||
CreatingThumbnail = auto()
|
||||
EncryptingThumbnail = auto()
|
||||
UploadingThumbnail = auto()
|
||||
Failure = auto() # TODO
|
||||
Uploading = auto()
|
||||
Caching = auto()
|
||||
UploadingThumbnail = auto()
|
||||
CachingThumbnail = auto()
|
||||
Failure = auto() # TODO
|
||||
|
||||
|
||||
@dataclass
|
||||
class Upload(ModelItem):
|
||||
filepath: Path = field()
|
||||
status: UploadStatus = UploadStatus.Starting
|
||||
status: UploadStatus = UploadStatus.Uploading
|
||||
total_size: int = 0
|
||||
uploaded: int = 0
|
||||
|
||||
|
|
|
@ -11,11 +11,14 @@ from enum import Enum
|
|||
from enum import auto as autostr
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import IO, Any, Callable, Dict, Tuple, Type, Union
|
||||
from typing import Any, Callable, Dict, Tuple, Type
|
||||
|
||||
import filetype
|
||||
from aiofiles.threadpool.binary import AsyncBufferedReader
|
||||
|
||||
from nio.crypto import AsyncDataT as File
|
||||
from nio.crypto import async_generator_from_data
|
||||
|
||||
File = Union[IO, bytes, str, Path]
|
||||
Size = Tuple[int, int]
|
||||
auto = autostr
|
||||
|
||||
|
@ -47,28 +50,28 @@ def dict_update_recursive(dict1: dict, dict2: dict) -> None:
|
|||
dict1[k] = dict2[k]
|
||||
|
||||
|
||||
def is_svg(file: File) -> bool:
|
||||
async def is_svg(file: File) -> bool:
|
||||
"""Return True if the file is a SVG. Uses lxml for detection."""
|
||||
|
||||
if isinstance(file, Path):
|
||||
file = str(file)
|
||||
chunks = [c async for c in async_generator_from_data(file)]
|
||||
|
||||
try:
|
||||
_, element = next(xml_etree.iterparse(file, ("start",)))
|
||||
return element.tag == "{http://www.w3.org/2000/svg}svg"
|
||||
except (StopIteration, xml_etree.ParseError):
|
||||
return False
|
||||
with io.BytesIO(b"".join(chunks)) as file:
|
||||
try:
|
||||
_, element = next(xml_etree.iterparse(file, ("start",)))
|
||||
return element.tag == "{http://www.w3.org/2000/svg}svg"
|
||||
except (StopIteration, xml_etree.ParseError):
|
||||
return False
|
||||
|
||||
|
||||
def svg_dimensions(file: File) -> Size:
|
||||
async def svg_dimensions(file: File) -> Size:
|
||||
"""Return the width & height or viewBox width & height for a SVG.
|
||||
If these properties are missing (broken file), ``(256, 256)`` is returned.
|
||||
"""
|
||||
|
||||
if isinstance(file, Path):
|
||||
file = str(file)
|
||||
chunks = [c async for c in async_generator_from_data(file)]
|
||||
|
||||
attrs = xml_etree.parse(file).getroot().attrib
|
||||
with io.BytesIO(b"".join(chunks)) as file:
|
||||
attrs = xml_etree.parse(file).getroot().attrib
|
||||
|
||||
try:
|
||||
width = round(float(attrs.get("width", attrs["viewBox"].split()[3])))
|
||||
|
@ -83,24 +86,33 @@ def svg_dimensions(file: File) -> Size:
|
|||
return (width, height)
|
||||
|
||||
|
||||
def guess_mime(file: File) -> str:
|
||||
async def guess_mime(file: File) -> str:
|
||||
"""Return the mime type for a file, or application/octet-stream if it
|
||||
can't be guessed.
|
||||
"""
|
||||
|
||||
if is_svg(file):
|
||||
return "image/svg+xml"
|
||||
|
||||
if isinstance(file, Path):
|
||||
file = str(file)
|
||||
elif isinstance(file, io.IOBase):
|
||||
if isinstance(file, io.IOBase):
|
||||
file.seek(0, 0)
|
||||
elif isinstance(file, AsyncBufferedReader):
|
||||
await file.seek(0, 0)
|
||||
|
||||
try:
|
||||
return filetype.guess_mime(file) or "application/octet-stream"
|
||||
first_chunk: bytes
|
||||
async for first_chunk in async_generator_from_data(file):
|
||||
break
|
||||
|
||||
# TODO: plaintext
|
||||
mime = filetype.guess_mime(first_chunk)
|
||||
|
||||
return mime or (
|
||||
"image/svg+xml" if await is_svg(file) else
|
||||
"application/octet-stream"
|
||||
)
|
||||
finally:
|
||||
if isinstance(file, io.IOBase):
|
||||
file.seek(0, 0)
|
||||
elif isinstance(file, AsyncBufferedReader):
|
||||
await file.seek(0, 0)
|
||||
|
||||
|
||||
def plain2html(text: str) -> str:
|
||||
|
|
|
@ -40,26 +40,20 @@ Rectangle {
|
|||
id: filenameLabel
|
||||
elide: Text.ElideRight
|
||||
text:
|
||||
model.status === "Starting" ?
|
||||
qsTr("Preparing %1...").arg(fileName) :
|
||||
|
||||
model.status === "Encrypting" ?
|
||||
qsTr("Encrypting %1...").arg(fileName) :
|
||||
|
||||
model.status === "Uploading" ?
|
||||
qsTr("Uploading %1...").arg(fileName) :
|
||||
|
||||
model.status === "CreatingThumbnail" ?
|
||||
qsTr("Generating thumbnail for %1...").arg(fileName) :
|
||||
|
||||
model.status === "EncryptingThumbnail" ?
|
||||
qsTr("Encrypting thumbnail for %1...").arg(fileName) :
|
||||
model.status === "Caching" ?
|
||||
qsTr("Caching %1...").arg(fileName) :
|
||||
|
||||
model.status === "UploadingThumbnail" ?
|
||||
qsTr("Uploading thumbnail for %1...").arg(fileName) :
|
||||
qsTr("Uploading %1 thumbnail...").arg(fileName) :
|
||||
|
||||
model.status === "CachingThumbnail" ?
|
||||
qsTr("Caching %1 thumbnail...").arg(fileName) :
|
||||
|
||||
model.status === "Failure" ?
|
||||
qsTr("Failed uploading %1.").arg(fileName) :
|
||||
qsTr("Uploading %1 failed").arg(fileName) :
|
||||
|
||||
qsTr("Invalid status for %1: %2")
|
||||
.arg(fileName).arg(model.status)
|
||||
|
|
Loading…
Reference in New Issue
Block a user