moment/src/python/image_provider.py

145 lines
4.1 KiB
Python
Raw Normal View History

# Copyright 2019 miruka
# This file is part of harmonyqml, licensed under LGPLv3.
import asyncio
import random
import re
from io import BytesIO
from pathlib import Path
from typing import Tuple
from urllib.parse import urlparse
from atomicfile import AtomicFile
from dataclasses import dataclass, field
from PIL import Image as PILImage
import nio
import pyotherside
from nio.api import ResizingMethod
Size = Tuple[int, int]
ImageData = Tuple[bytearray, Size, int] # last int: pyotherside format enum
@dataclass
class Thumbnail:
# pylint: disable=no-member
provider: "ImageProvider" = field()
mxc: str = field()
width: int = field()
height: int = field()
def __post_init__(self) -> None:
self.mxc = re.sub(r"#auto$", "", self.mxc)
if not re.match(r"^mxc://.+/.+", self.mxc):
raise ValueError(f"Invalid mxc URI: {self.mxc}")
@property
def server_size(self) -> Tuple[int, int]:
# https://matrix.org/docs/spec/client_server/latest#thumbnails
if self.width > 640 or self.height > 480:
return (800, 600)
if self.width > 320 or self.height > 240:
return (640, 480)
if self.width > 96 or self.height > 96:
return (320, 240)
if self.width > 32 or self.height > 32:
return (96, 96)
return (32, 32)
@property
def resize_method(self) -> ResizingMethod:
return ResizingMethod.scale \
if self.width > 96 or self.height > 96 else ResizingMethod.crop
@property
def http(self) -> str:
return nio.Api.mxc_to_http(self.mxc)
@property
def local_path(self) -> Path:
# pylint: disable=bad-string-format-type
parsed = urlparse(self.mxc)
name = "%s.%03d.%03d.%s" % (
parsed.path.lstrip("/"),
self.server_size[0],
self.server_size[1],
self.resize_method.value,
)
return self.provider.cache / parsed.netloc / name
async def download(self) -> bytes:
client = random.choice(
tuple(self.provider.app.backend.clients.values())
)
parsed = urlparse(self.mxc)
response = await client.thumbnail(
server_name = parsed.netloc,
media_id = parsed.path.lstrip("/"),
width = self.server_size[0],
height = self.server_size[1],
method = self.resize_method,
)
if isinstance(response, nio.ThumbnailError):
# Return a transparent 1x1 PNG
with BytesIO() as img_out:
PILImage.new("RGBA", (1, 1), (0, 0, 0, 0)).save(img_out, "PNG")
return img_out.getvalue()
body = response.body
if response.content_type not in ("image/jpeg", "image/png"):
with BytesIO(body) as img_in, BytesIO() as img_out:
PILImage.open(img_in).save(img_out, "PNG")
body = img_out.getvalue()
self.local_path.parent.mkdir(parents=True, exist_ok=True)
with AtomicFile(str(self.local_path), "wb") as file:
file.write(body)
return body
async def get_data(self) -> ImageData:
try:
body = self.local_path.read_bytes()
except FileNotFoundError:
body = await self.download()
with BytesIO(body) as img_in:
real_size = PILImage.open(img_in).size
return (bytearray(body), real_size, pyotherside.format_data)
class ImageProvider:
def __init__(self, app) -> None:
self.app = app
self.cache = Path(self.app.appdirs.user_cache_dir) / "thumbnails"
self.cache.mkdir(parents=True, exist_ok=True)
def get(self, image_id: str, requested_size: Size) -> ImageData:
if requested_size[0] < 1 or requested_size[1] < 1:
raise ValueError(f"width or height < 1: {requested_size!r}")
return asyncio.run_coroutine_threadsafe(
Thumbnail(self, image_id, *requested_size).get_data(),
self.app.loop
).result()